diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index 69ab037c5..2fff214c5 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -241,6 +241,7 @@ public class BaseTransactionalMap { protected RFuture addAndGetOperationAsync(K key, Number value) { RPromise result = new RedissonPromise(); + long threadId = Thread.currentThread().getId(); executeLocked(result, key, new Runnable() { @Override public void run() { @@ -253,7 +254,7 @@ public class BaseTransactionalMap { } BigDecimal res = currentValue.add(new BigDecimal(value.toString())); - operations.add(new MapAddAndGetOperation(map, key, value, transactionId)); + operations.add(new MapAddAndGetOperation(map, key, value, transactionId, threadId)); state.put(keyHash, new MapEntry(key, res)); if (deleted != null) { deleted = false; @@ -272,7 +273,7 @@ public class BaseTransactionalMap { BigDecimal currentValue = new BigDecimal(r.toString()); BigDecimal res = currentValue.add(new BigDecimal(value.toString())); - operations.add(new MapAddAndGetOperation(map, key, value, transactionId)); + operations.add(new MapAddAndGetOperation(map, key, value, transactionId, threadId)); state.put(keyHash, new MapEntry(key, res)); if (deleted != null) { deleted = false; @@ -287,7 +288,8 @@ public class BaseTransactionalMap { } protected RFuture putIfAbsentOperationAsync(K key, V value) { - return putIfAbsentOperationAsync(key, value, new MapPutIfAbsentOperation(map, key, value, transactionId)); + long threadId = Thread.currentThread().getId(); + return putIfAbsentOperationAsync(key, value, new MapPutIfAbsentOperation(map, key, value, transactionId, threadId)); } protected RFuture putIfAbsentOperationAsync(K key, V value, MapOperation mapOperation) { @@ -333,7 +335,8 @@ public class BaseTransactionalMap { } protected final RFuture putOperationAsync(K key, V value) { - return putOperationAsync(key, value, new MapPutOperation(map, key, value, transactionId)); + long threadId = Thread.currentThread().getId(); + return putOperationAsync(key, value, new MapPutOperation(map, key, value, transactionId, threadId)); } protected RFuture putOperationAsync(K key, V value, MapOperation operation) { @@ -377,7 +380,8 @@ public class BaseTransactionalMap { } protected RFuture fastPutIfAbsentOperationAsync(K key, V value) { - return fastPutIfAbsentOperationAsync(key, value, new MapFastPutIfAbsentOperation(map, key, value, transactionId)); + long threadId = Thread.currentThread().getId(); + return fastPutIfAbsentOperationAsync(key, value, new MapFastPutIfAbsentOperation(map, key, value, transactionId, threadId)); } protected RFuture fastPutIfAbsentOperationAsync(K key, V value, MapOperation mapOperation) { @@ -423,7 +427,8 @@ public class BaseTransactionalMap { } protected RFuture fastPutOperationAsync(K key, V value) { - return fastPutOperationAsync(key, value, new MapFastPutOperation(map, key, value, transactionId)); + long threadId = Thread.currentThread().getId(); + return fastPutOperationAsync(key, value, new MapFastPutOperation(map, key, value, transactionId, threadId)); } protected RFuture fastPutOperationAsync(K key, V value, MapOperation operation) { @@ -471,6 +476,7 @@ public class BaseTransactionalMap { @SuppressWarnings("unchecked") protected RFuture fastRemoveOperationAsync(K... keys) { RPromise result = new RedissonPromise(); + long threadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { @@ -481,7 +487,7 @@ public class BaseTransactionalMap { HashValue keyHash = toKeyHash(key); MapEntry currentValue = state.get(keyHash); if (currentValue != null && currentValue != MapEntry.NULL) { - operations.add(new MapFastRemoveOperation(map, key, transactionId)); + operations.add(new MapFastRemoveOperation(map, key, transactionId, threadId)); state.put(keyHash, MapEntry.NULL); counter.incrementAndGet(); @@ -498,7 +504,7 @@ public class BaseTransactionalMap { for (K key : res.keySet()) { HashValue keyHash = toKeyHash(key); - operations.add(new MapFastRemoveOperation(map, key, transactionId)); + operations.add(new MapFastRemoveOperation(map, key, transactionId, threadId)); counter.incrementAndGet(); state.put(keyHash, MapEntry.NULL); } @@ -674,13 +680,14 @@ public class BaseTransactionalMap { protected RFuture removeOperationAsync(K key) { RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); executeLocked(result, key, new Runnable() { @Override public void run() { HashValue keyHash = toKeyHash(key); MapEntry entry = state.get(keyHash); if (entry != null) { - operations.add(new MapRemoveOperation(map, key, transactionId)); + operations.add(new MapRemoveOperation(map, key, transactionId, threadId)); if (entry == MapEntry.NULL) { result.trySuccess(null); } else { @@ -695,7 +702,7 @@ public class BaseTransactionalMap { result.tryFailure(e); return; } - operations.add(new MapRemoveOperation(map, key, transactionId)); + operations.add(new MapRemoveOperation(map, key, transactionId, threadId)); if (res != null) { state.put(keyHash, MapEntry.NULL); } @@ -709,6 +716,7 @@ public class BaseTransactionalMap { protected RFuture removeOperationAsync(Object key, Object value) { RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); executeLocked(result, (K) key, new Runnable() { @Override public void run() { @@ -720,7 +728,7 @@ public class BaseTransactionalMap { return; } - operations.add(new MapRemoveOperation(map, key, value, transactionId)); + operations.add(new MapRemoveOperation(map, key, value, transactionId, threadId)); if (isEqual(entry.getValue(), value)) { state.put(keyHash, MapEntry.NULL); result.trySuccess(true); @@ -736,7 +744,7 @@ public class BaseTransactionalMap { result.tryFailure(e); return; } - operations.add(new MapRemoveOperation(map, key, value, transactionId)); + operations.add(new MapRemoveOperation(map, key, value, transactionId, threadId)); boolean res = isEqual(r, value); if (res) { state.put(keyHash, MapEntry.NULL); @@ -762,11 +770,12 @@ public class BaseTransactionalMap { protected RFuture putAllOperationAsync(Map entries) { RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { for (Entry entry : entries.entrySet()) { - operations.add(new MapPutOperation(map, entry.getKey(), entry.getValue(), transactionId)); + operations.add(new MapPutOperation(map, entry.getKey(), entry.getValue(), transactionId, threadId)); HashValue keyHash = toKeyHash(entry.getKey()); state.put(keyHash, new MapEntry(entry.getKey(), entry.getValue())); } @@ -783,6 +792,7 @@ public class BaseTransactionalMap { protected RFuture replaceOperationAsync(K key, V oldValue, V newValue) { RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); executeLocked(result, key, new Runnable() { @Override public void run() { @@ -794,7 +804,7 @@ public class BaseTransactionalMap { return; } - operations.add(new MapReplaceOperation(map, key, newValue, oldValue, transactionId)); + operations.add(new MapReplaceOperation(map, key, newValue, oldValue, transactionId, threadId)); if (isEqual(entry.getValue(), oldValue)) { state.put(keyHash, new MapEntry(key, newValue)); result.trySuccess(true); @@ -811,7 +821,7 @@ public class BaseTransactionalMap { return; } - operations.add(new MapReplaceOperation(map, key, newValue, oldValue, transactionId)); + operations.add(new MapReplaceOperation(map, key, newValue, oldValue, transactionId, threadId)); boolean res = isEqual(r, oldValue); if (res) { state.put(keyHash, new MapEntry(key, newValue)); @@ -825,12 +835,13 @@ public class BaseTransactionalMap { protected RFuture replaceOperationAsync(K key, V value) { RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); executeLocked(result, key, new Runnable() { @Override public void run() { HashValue keyHash = toKeyHash(key); MapEntry entry = state.get(keyHash); - operations.add(new MapReplaceOperation(map, key, value, transactionId)); + operations.add(new MapReplaceOperation(map, key, value, transactionId, threadId)); if (entry != null) { if (entry == MapEntry.NULL) { result.trySuccess(null); @@ -848,7 +859,7 @@ public class BaseTransactionalMap { return; } - operations.add(new MapReplaceOperation(map, key, value, transactionId)); + operations.add(new MapReplaceOperation(map, key, value, transactionId, threadId)); if (res != null) { state.put(keyHash, new MapEntry(key, value)); } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMapCache.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMapCache.java index 9a7e0eaf1..0c66050a9 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMapCache.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMapCache.java @@ -41,21 +41,25 @@ public class BaseTransactionalMapCache extends BaseTransactionalMap } public RFuture putIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { - return putIfAbsentOperationAsync(key, value, new MapCachePutIfAbsentOperation(map, key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId)); + long threadId = Thread.currentThread().getId(); + return putIfAbsentOperationAsync(key, value, new MapCachePutIfAbsentOperation(map, key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId, threadId)); } public RFuture fastPutOperationAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { - return fastPutOperationAsync(key, value, new MapCacheFastPutOperation(map, key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId)); + long threadId = Thread.currentThread().getId(); + return fastPutOperationAsync(key, value, new MapCacheFastPutOperation(map, key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId, threadId)); } public RFuture putOperationAsync(K key, V value, long ttlTimeout, long maxIdleTimeout, long maxIdleDelta) { + long threadId = Thread.currentThread().getId(); return putOperationAsync(key, value, new MapCachePutOperation(map, key, value, - ttlTimeout, TimeUnit.MILLISECONDS, maxIdleTimeout, TimeUnit.MILLISECONDS, transactionId)); + ttlTimeout, TimeUnit.MILLISECONDS, maxIdleTimeout, TimeUnit.MILLISECONDS, transactionId, threadId)); } public RFuture fastPutIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { + long threadId = Thread.currentThread().getId(); return fastPutIfAbsentOperationAsync(key, value, new MapCacheFastPutIfAbsentOperation(map, key, value, - ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId)); + ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId, threadId)); } } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java index 07ac15556..826e19e26 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java @@ -15,25 +15,11 @@ */ package org.redisson.transaction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.TimeUnit; - +import io.netty.buffer.ByteBuf; import org.redisson.RedissonMultiLock; import org.redisson.RedissonObject; import org.redisson.RedissonSet; -import org.redisson.api.RCollectionAsync; -import org.redisson.api.RFuture; -import org.redisson.api.RLock; -import org.redisson.api.RObject; -import org.redisson.api.RSet; -import org.redisson.api.SortOrder; +import org.redisson.api.*; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; @@ -47,7 +33,9 @@ import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.UnlinkOperation; import org.redisson.transaction.operation.set.MoveOperation; -import io.netty.buffer.ByteBuf; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; /** * @@ -226,7 +214,8 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { } public RFuture addAsync(V value) { - TransactionalOperation operation = createAddOperation(value); + long threadId = Thread.currentThread().getId(); + TransactionalOperation operation = createAddOperation(value, threadId); return addAsync(value, operation); } @@ -266,7 +255,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { return result; } - protected abstract TransactionalOperation createAddOperation(V value); + protected abstract TransactionalOperation createAddOperation(V value, long threadId); public RFuture removeRandomAsync() { throw new UnsupportedOperationException(); @@ -328,13 +317,14 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { public RFuture removeAsync(Object value) { RPromise result = new RedissonPromise(); + long threadId = Thread.currentThread().getId(); executeLocked(result, (V) value, new Runnable() { @Override public void run() { HashValue keyHash = toHash(value); Object currentValue = state.get(keyHash); if (currentValue != null) { - operations.add(createRemoveOperation(value)); + operations.add(createRemoveOperation(value, threadId)); if (currentValue == NULL) { result.trySuccess(false); } else { @@ -350,7 +340,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { return; } - operations.add(createRemoveOperation(value)); + operations.add(createRemoveOperation(value, threadId)); if (res) { state.put(keyHash, NULL); } @@ -364,7 +354,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { } - protected abstract TransactionalOperation createRemoveOperation(Object value); + protected abstract TransactionalOperation createRemoveOperation(Object value, long threadId); public RFuture containsAllAsync(Collection c) { List coll = new ArrayList(c); @@ -383,6 +373,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { public RFuture addAllAsync(Collection c) { RPromise result = new RedissonPromise(); + long threadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { @@ -393,7 +384,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { } for (V value : c) { - operations.add(createAddOperation(value)); + operations.add(createAddOperation(value, threadId)); HashValue keyHash = toHash(value); state.put(keyHash, value); } @@ -415,6 +406,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { public RFuture removeAllAsync(Collection c) { RPromise result = new RedissonPromise(); + long threadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { @@ -425,7 +417,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { } for (Object value : c) { - operations.add(createRemoveOperation(value)); + operations.add(createRemoveOperation(value, threadId)); HashValue keyHash = toHash(value); state.put(keyHash, NULL); } diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java index 3f674eccd..a461b0ef1 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBucket.java @@ -15,11 +15,7 @@ */ package org.redisson.transaction; -import java.util.Date; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - +import io.netty.buffer.ByteBuf; import org.redisson.RedissonBucket; import org.redisson.api.RFuture; import org.redisson.api.RLock; @@ -31,13 +27,12 @@ import org.redisson.transaction.operation.DeleteOperation; import org.redisson.transaction.operation.TouchOperation; import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.UnlinkOperation; -import org.redisson.transaction.operation.bucket.BucketCompareAndSetOperation; -import org.redisson.transaction.operation.bucket.BucketGetAndDeleteOperation; -import org.redisson.transaction.operation.bucket.BucketGetAndSetOperation; -import org.redisson.transaction.operation.bucket.BucketSetOperation; -import org.redisson.transaction.operation.bucket.BucketTrySetOperation; +import org.redisson.transaction.operation.bucket.*; -import io.netty.buffer.ByteBuf; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -196,11 +191,12 @@ public class RedissonTransactionalBucket extends RedissonBucket { public RFuture deleteAsync() { checkState(); RPromise result = new RedissonPromise(); + long threadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { if (state != null) { - operations.add(new DeleteOperation(getName(), getLockName(), transactionId)); + operations.add(new DeleteOperation(getName(), getLockName(), transactionId, threadId)); if (state == NULL) { result.trySuccess(false); } else { @@ -216,7 +212,7 @@ public class RedissonTransactionalBucket extends RedissonBucket { return; } - operations.add(new DeleteOperation(getName(), getLockName(), transactionId)); + operations.add(new DeleteOperation(getName(), getLockName(), transactionId, threadId)); state = NULL; result.trySuccess(res); }); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java index 48161d08f..abf82f118 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java @@ -141,6 +141,7 @@ public class RedissonTransactionalBuckets extends RedissonBuckets { public RFuture deleteAsync(String... keys) { checkState(); RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); executeLocked(result, new Runnable() { @Override public void run() { @@ -149,7 +150,7 @@ public class RedissonTransactionalBuckets extends RedissonBuckets { for (String key : keys) { Object st = state.get(key); if (st != null) { - operations.add(new DeleteOperation(key, getLockName(key), transactionId)); + operations.add(new DeleteOperation(key, getLockName(key), transactionId, threadId)); if (st != NULL) { state.put(key, NULL); counter.incrementAndGet(); @@ -168,7 +169,7 @@ public class RedissonTransactionalBuckets extends RedissonBuckets { } if (res > 0) { - operations.add(new DeleteOperation(key, getLockName(key), transactionId)); + operations.add(new DeleteOperation(key, getLockName(key), transactionId, threadId)); state.put(key, NULL); counter.incrementAndGet(); } diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java index fdeb05a57..07c57f3a8 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java @@ -61,8 +61,8 @@ public class TransactionalSet extends BaseTransactionalSet { } @Override - protected TransactionalOperation createAddOperation(V value) { - return new AddOperation(set, value, transactionId); + protected TransactionalOperation createAddOperation(V value, long threadId) { + return new AddOperation(set, value, transactionId, threadId); } @Override @@ -71,8 +71,8 @@ public class TransactionalSet extends BaseTransactionalSet { } @Override - protected TransactionalOperation createRemoveOperation(Object value) { - return new RemoveOperation(set, value, transactionId); + protected TransactionalOperation createRemoveOperation(Object value, long threadId) { + return new RemoveOperation(set, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java index ec3034734..0a95a114d 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java @@ -62,12 +62,13 @@ public class TransactionalSetCache extends BaseTransactionalSet { } public RFuture addAsync(V value, long ttl, TimeUnit ttlUnit) { - return addAsync(value, new AddCacheOperation(set, value, ttl, ttlUnit, transactionId)); + long threadId = Thread.currentThread().getId(); + return addAsync(value, new AddCacheOperation(set, value, ttl, ttlUnit, transactionId, threadId)); } @Override - protected TransactionalOperation createAddOperation(V value) { - return new AddCacheOperation(set, value, transactionId); + protected TransactionalOperation createAddOperation(V value, long threadId) { + return new AddCacheOperation(set, value, transactionId, threadId); } @Override @@ -76,8 +77,8 @@ public class TransactionalSetCache extends BaseTransactionalSet { } @Override - protected TransactionalOperation createRemoveOperation(Object value) { - return new RemoveCacheOperation(set, value, transactionId); + protected TransactionalOperation createRemoveOperation(Object value, long threadId) { + return new RemoveCacheOperation(set, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java index 6ed866717..16fc82bd0 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/DeleteOperation.java @@ -30,15 +30,17 @@ public class DeleteOperation extends TransactionalOperation { private String lockName; private String transactionId; + private long threadId; public DeleteOperation(String name) { - this(name, null, null); + this(name, null, null, 0); } - public DeleteOperation(String name, String lockName, String transactionId) { + public DeleteOperation(String name, String lockName, String transactionId, long threadId) { super(name, null); this.lockName = lockName; this.transactionId = transactionId; + this.threadId = threadId; } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapAddAndGetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapAddAndGetOperation.java index 67f8ccb4d..054e3cc7b 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapAddAndGetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapAddAndGetOperation.java @@ -27,8 +27,8 @@ public class MapAddAndGetOperation extends MapOperation { public MapAddAndGetOperation() { } - public MapAddAndGetOperation(RMap map, Object key, Object value, String transactionId) { - super(map, key, value, transactionId); + public MapAddAndGetOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapCacheFastPutIfAbsentOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapCacheFastPutIfAbsentOperation.java index 1941b87ed..373c4aa8d 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapCacheFastPutIfAbsentOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapCacheFastPutIfAbsentOperation.java @@ -33,8 +33,8 @@ public class MapCacheFastPutIfAbsentOperation extends MapOperation { private TimeUnit maxIdleUnit; public MapCacheFastPutIfAbsentOperation(RMap map, Object key, Object value, long ttl, TimeUnit ttlUnit, - long maxIdleTime, TimeUnit maxIdleUnit, String transactionId) { - super(map, key, value, transactionId); + long maxIdleTime, TimeUnit maxIdleUnit, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); this.ttl = ttl; this.ttlUnit = ttlUnit; this.maxIdleTime = maxIdleTime; diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapCacheFastPutOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapCacheFastPutOperation.java index fe8685cce..8b3110319 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapCacheFastPutOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapCacheFastPutOperation.java @@ -35,8 +35,8 @@ public class MapCacheFastPutOperation extends MapOperation { public MapCacheFastPutOperation() { } - public MapCacheFastPutOperation(RMap map, Object key, Object value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit, String transactionId) { - super(map, key, value, transactionId); + public MapCacheFastPutOperation(RMap map, Object key, Object value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); this.ttl = ttl; this.ttlUnit = ttlUnit; this.maxIdleTime = maxIdleTime; diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapCachePutIfAbsentOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapCachePutIfAbsentOperation.java index f61d484e6..fdf2afc3c 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapCachePutIfAbsentOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapCachePutIfAbsentOperation.java @@ -36,16 +36,16 @@ public class MapCachePutIfAbsentOperation extends MapOperation { } public MapCachePutIfAbsentOperation(RMap map, Object key, Object value, - long ttl, TimeUnit unit, long maxIdleTime, TimeUnit maxIdleUnit, String transactionId) { - this(map, key, value, transactionId); + long ttl, TimeUnit unit, long maxIdleTime, TimeUnit maxIdleUnit, String transactionId, long threadId) { + this(map, key, value, transactionId, threadId); this.ttl = ttl; this.unit = unit; this.maxIdleTime = maxIdleTime; this.maxIdleUnit = maxIdleUnit; } - public MapCachePutIfAbsentOperation(RMap map, Object key, Object value, String transactionId) { - super(map, key, value, transactionId); + public MapCachePutIfAbsentOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapCachePutOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapCachePutOperation.java index 0cfec77f7..74d03cbab 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapCachePutOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapCachePutOperation.java @@ -35,8 +35,8 @@ public class MapCachePutOperation extends MapOperation { public MapCachePutOperation() { } - public MapCachePutOperation(RMap map, Object key, Object value, long ttlTimeout, TimeUnit ttlUnit, long maxIdleTimeout, TimeUnit maxIdleUnit, String transactionId) { - super(map, key, value, transactionId); + public MapCachePutOperation(RMap map, Object key, Object value, long ttlTimeout, TimeUnit ttlUnit, long maxIdleTimeout, TimeUnit maxIdleUnit, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); this.ttlTimeout = ttlTimeout; this.ttlUnit = ttlUnit; this.maxIdleTimeout = maxIdleTimeout; diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastPutIfAbsentOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastPutIfAbsentOperation.java index 16cbe46e4..7a52054e4 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastPutIfAbsentOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastPutIfAbsentOperation.java @@ -27,8 +27,8 @@ public class MapFastPutIfAbsentOperation extends MapOperation { public MapFastPutIfAbsentOperation() { } - public MapFastPutIfAbsentOperation(RMap map, Object key, Object value, String transactionId) { - super(map, key, value, transactionId); + public MapFastPutIfAbsentOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastPutOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastPutOperation.java index 0545243cd..7775eb2d4 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastPutOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastPutOperation.java @@ -27,8 +27,8 @@ public class MapFastPutOperation extends MapOperation { public MapFastPutOperation() { } - public MapFastPutOperation(RMap map, Object key, Object value, String transactionId) { - super(map, key, value, transactionId); + public MapFastPutOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastRemoveOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastRemoveOperation.java index ea7f0331f..07a2a0af9 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastRemoveOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapFastRemoveOperation.java @@ -27,8 +27,8 @@ public class MapFastRemoveOperation extends MapOperation { public MapFastRemoveOperation() { } - public MapFastRemoveOperation(RMap map, Object key, String transactionId) { - super(map, key, null, transactionId); + public MapFastRemoveOperation(RMap map, Object key, String transactionId, long threadId) { + super(map, key, null, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapOperation.java index 20f71cfea..b30defd02 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapOperation.java @@ -36,21 +36,23 @@ public abstract class MapOperation extends TransactionalOperation { Object oldValue; RMap map; String transactionId; - + long threadId; + public MapOperation() { } - public MapOperation(RMap map, Object key, Object value, String transactionId) { - this(map, key, value, null, transactionId); + public MapOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + this(map, key, value, null, transactionId, threadId); } - public MapOperation(RMap map, Object key, Object value, Object oldValue, String transactionId) { + public MapOperation(RMap map, Object key, Object value, Object oldValue, String transactionId, long threadId) { super(map.getName(), map.getCodec()); this.map = map; this.key = key; this.value = value; this.oldValue = oldValue; this.transactionId = transactionId; + this.threadId = threadId; } public Object getKey() { @@ -65,20 +67,20 @@ public abstract class MapOperation extends TransactionalOperation { public final void commit(CommandAsyncExecutor commandExecutor) { RMap map = getMap(commandExecutor); commit(map); - getLock(map, commandExecutor, key).unlockAsync(); + getLock(map, commandExecutor, key).unlockAsync(threadId); } protected RMap getMap(CommandAsyncExecutor commandExecutor) { if (map instanceof RMapCache) { - return new RedissonMapCache(codec, null, commandExecutor, name, null, null, null); + return new RedissonMapCache<>(codec, null, commandExecutor, name, null, null, null); } - return new RedissonMap(codec, commandExecutor, name, null, null, null); + return new RedissonMap<>(codec, commandExecutor, name, null, null, null); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RMap map = getMap(commandExecutor); - getLock(map, commandExecutor, key).unlockAsync(); + getLock(map, commandExecutor, key).unlockAsync(threadId); } protected RLock getLock(RMap map, CommandAsyncExecutor commandExecutor, Object key) { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutIfAbsentOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutIfAbsentOperation.java index 64487d991..582dfa6e2 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutIfAbsentOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutIfAbsentOperation.java @@ -27,8 +27,8 @@ public class MapPutIfAbsentOperation extends MapOperation { public MapPutIfAbsentOperation() { } - public MapPutIfAbsentOperation(RMap map, Object key, Object value, String transactionId) { - super(map, key, value, transactionId); + public MapPutIfAbsentOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutOperation.java index b8c282483..391db2f5c 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutOperation.java @@ -27,8 +27,8 @@ public class MapPutOperation extends MapOperation { public MapPutOperation() { } - public MapPutOperation(RMap map, Object key, Object value, String transactionId) { - super(map, key, value, transactionId); + public MapPutOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapRemoveOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapRemoveOperation.java index 97597e2f9..8fef642b4 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapRemoveOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapRemoveOperation.java @@ -27,12 +27,12 @@ public class MapRemoveOperation extends MapOperation { public MapRemoveOperation() { } - public MapRemoveOperation(RMap map, Object key, String transactionId) { - super(map, key, null, transactionId); + public MapRemoveOperation(RMap map, Object key, String transactionId, long threadId) { + super(map, key, null, transactionId, threadId); } - public MapRemoveOperation(RMap map, Object key, Object value, String transactionId) { - super(map, key, value, transactionId); + public MapRemoveOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapReplaceOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapReplaceOperation.java index 07a689aeb..18d698d69 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/map/MapReplaceOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapReplaceOperation.java @@ -27,12 +27,12 @@ public class MapReplaceOperation extends MapOperation { public MapReplaceOperation() { } - public MapReplaceOperation(RMap map, Object key, Object value, Object oldValue, String transactionId) { - super(map, key, value, oldValue, transactionId); + public MapReplaceOperation(RMap map, Object key, Object value, Object oldValue, String transactionId, long threadId) { + super(map, key, value, oldValue, transactionId, threadId); } - public MapReplaceOperation(RMap map, Object key, Object value, String transactionId) { - super(map, key, value, transactionId); + public MapReplaceOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/set/AddCacheOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/set/AddCacheOperation.java index a6b1f97cf..4c166723e 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/set/AddCacheOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/set/AddCacheOperation.java @@ -33,37 +33,39 @@ public class AddCacheOperation extends SetOperation { private Object value; private long ttl; private TimeUnit timeUnit; + private long threadId; - public AddCacheOperation(RObject set, Object value, String transactionId) { - this(set, value, 0, null, transactionId); + public AddCacheOperation(RObject set, Object value, String transactionId, long threadId) { + this(set, value, 0, null, transactionId, threadId); } - public AddCacheOperation(RObject set, Object value, long ttl, TimeUnit timeUnit, String transactionId) { - this(set.getName(), set.getCodec(), value, ttl, timeUnit, transactionId); + public AddCacheOperation(RObject set, Object value, long ttl, TimeUnit timeUnit, String transactionId, long threadId) { + this(set.getName(), set.getCodec(), value, ttl, timeUnit, transactionId, threadId); } - public AddCacheOperation(String name, Codec codec, Object value, long ttl, TimeUnit timeUnit, String transactionId) { + public AddCacheOperation(String name, Codec codec, Object value, long ttl, TimeUnit timeUnit, String transactionId, long threadId) { super(name, codec, transactionId); this.value = value; this.timeUnit = timeUnit; this.ttl = ttl; + this.threadId = threadId; } @Override public void commit(CommandAsyncExecutor commandExecutor) { - RSetCache set = new RedissonSetCache(codec, null, commandExecutor, name, null); + RSetCache set = new RedissonSetCache<>(codec, null, commandExecutor, name, null); if (timeUnit != null) { set.addAsync(value, ttl, timeUnit); } else { set.addAsync(value); } - getLock(set, commandExecutor, value).unlockAsync(); + getLock(set, commandExecutor, value).unlockAsync(threadId); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RSetCache set = new RedissonSetCache(codec, null, commandExecutor, name, null); - getLock(set, commandExecutor, value).unlockAsync(); + getLock(set, commandExecutor, value).unlockAsync(threadId); } public Object getValue() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/set/AddOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/set/AddOperation.java index 57d5b0fbc..bbdf5b61c 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/set/AddOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/set/AddOperation.java @@ -29,14 +29,16 @@ import org.redisson.command.CommandAsyncExecutor; public class AddOperation extends SetOperation { private Object value; + private long threadId; - public AddOperation(RObject set, Object value, String transactionId) { - this(set.getName(), set.getCodec(), value, transactionId); + public AddOperation(RObject set, Object value, String transactionId, long threadId) { + this(set.getName(), set.getCodec(), value, transactionId, threadId); } - public AddOperation(String name, Codec codec, Object value, String transactionId) { + public AddOperation(String name, Codec codec, Object value, String transactionId, long threadId) { super(name, codec, transactionId); this.value = value; + this.threadId = threadId; } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveCacheOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveCacheOperation.java index 6245edcdf..b6121b2df 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveCacheOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveCacheOperation.java @@ -29,27 +29,29 @@ import org.redisson.command.CommandAsyncExecutor; public class RemoveCacheOperation extends SetOperation { private Object value; - - public RemoveCacheOperation(RObject set, Object value, String transactionId) { - this(set.getName(), set.getCodec(), value, transactionId); + private long threadId; + + public RemoveCacheOperation(RObject set, Object value, String transactionId, long threadId) { + this(set.getName(), set.getCodec(), value, transactionId, threadId); } - public RemoveCacheOperation(String name, Codec codec, Object value, String transactionId) { + public RemoveCacheOperation(String name, Codec codec, Object value, String transactionId, long threadId) { super(name, codec, transactionId); this.value = value; + this.threadId = threadId; } @Override public void commit(CommandAsyncExecutor commandExecutor) { RSetCache set = new RedissonSetCache(codec, null, commandExecutor, name, null); set.removeAsync(value); - getLock(set, commandExecutor, value).unlockAsync(); + getLock(set, commandExecutor, value).unlockAsync(threadId); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { RSetCache set = new RedissonSetCache(codec, null, commandExecutor, name, null); - getLock(set, commandExecutor, value).unlockAsync(); + getLock(set, commandExecutor, value).unlockAsync(threadId); } public Object getValue() { diff --git a/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveOperation.java index 5ac9f233d..4b66a5ed7 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/set/RemoveOperation.java @@ -29,27 +29,29 @@ import org.redisson.command.CommandAsyncExecutor; public class RemoveOperation extends SetOperation { private Object value; + private long threadId; - public RemoveOperation(RObject set, Object value, String transactionId) { - this(set.getName(), set.getCodec(), value, transactionId); + public RemoveOperation(RObject set, Object value, String transactionId, long threadId) { + this(set.getName(), set.getCodec(), value, transactionId, threadId); } - public RemoveOperation(String name, Codec codec, Object value, String transactionId) { + public RemoveOperation(String name, Codec codec, Object value, String transactionId, long threadId) { super(name, codec, transactionId); this.value = value; + this.threadId = threadId; } @Override public void commit(CommandAsyncExecutor commandExecutor) { - RSet set = new RedissonSet(codec, commandExecutor, name, null); + RSet set = new RedissonSet<>(codec, commandExecutor, name, null); set.removeAsync(value); - getLock(set, commandExecutor, value).unlockAsync(); + getLock(set, commandExecutor, value).unlockAsync(threadId); } @Override public void rollback(CommandAsyncExecutor commandExecutor) { - RSet set = new RedissonSet(codec, commandExecutor, name, null); - getLock(set, commandExecutor, value).unlockAsync(); + RSet set = new RedissonSet<>(codec, commandExecutor, name, null); + getLock(set, commandExecutor, value).unlockAsync(threadId); } public Object getValue() {