Fixed - Reactive/Rxjava2 transaction doesn't unlock lock on rollback or commit #2483.

pull/2452/merge
Nikita Koksharov 5 years ago
parent 9ee2fcc04e
commit e04daf5715

@ -241,6 +241,7 @@ public class BaseTransactionalMap<K, V> {
protected RFuture<V> addAndGetOperationAsync(K key, Number value) {
RPromise<V> result = new RedissonPromise<V>();
long threadId = Thread.currentThread().getId();
executeLocked(result, key, new Runnable() {
@Override
public void run() {
@ -253,7 +254,7 @@ public class BaseTransactionalMap<K, V> {
}
BigDecimal res = currentValue.add(new BigDecimal(value.toString()));
operations.add(new MapAddAndGetOperation(map, key, value, transactionId));
operations.add(new MapAddAndGetOperation(map, key, value, transactionId, threadId));
state.put(keyHash, new MapEntry(key, res));
if (deleted != null) {
deleted = false;
@ -272,7 +273,7 @@ public class BaseTransactionalMap<K, V> {
BigDecimal currentValue = new BigDecimal(r.toString());
BigDecimal res = currentValue.add(new BigDecimal(value.toString()));
operations.add(new MapAddAndGetOperation(map, key, value, transactionId));
operations.add(new MapAddAndGetOperation(map, key, value, transactionId, threadId));
state.put(keyHash, new MapEntry(key, res));
if (deleted != null) {
deleted = false;
@ -287,7 +288,8 @@ public class BaseTransactionalMap<K, V> {
}
protected RFuture<V> putIfAbsentOperationAsync(K key, V value) {
return putIfAbsentOperationAsync(key, value, new MapPutIfAbsentOperation(map, key, value, transactionId));
long threadId = Thread.currentThread().getId();
return putIfAbsentOperationAsync(key, value, new MapPutIfAbsentOperation(map, key, value, transactionId, threadId));
}
protected RFuture<V> putIfAbsentOperationAsync(K key, V value, MapOperation mapOperation) {
@ -333,7 +335,8 @@ public class BaseTransactionalMap<K, V> {
}
protected final RFuture<V> putOperationAsync(K key, V value) {
return putOperationAsync(key, value, new MapPutOperation(map, key, value, transactionId));
long threadId = Thread.currentThread().getId();
return putOperationAsync(key, value, new MapPutOperation(map, key, value, transactionId, threadId));
}
protected RFuture<V> putOperationAsync(K key, V value, MapOperation operation) {
@ -377,7 +380,8 @@ public class BaseTransactionalMap<K, V> {
}
protected RFuture<Boolean> fastPutIfAbsentOperationAsync(K key, V value) {
return fastPutIfAbsentOperationAsync(key, value, new MapFastPutIfAbsentOperation(map, key, value, transactionId));
long threadId = Thread.currentThread().getId();
return fastPutIfAbsentOperationAsync(key, value, new MapFastPutIfAbsentOperation(map, key, value, transactionId, threadId));
}
protected RFuture<Boolean> fastPutIfAbsentOperationAsync(K key, V value, MapOperation mapOperation) {
@ -423,7 +427,8 @@ public class BaseTransactionalMap<K, V> {
}
protected RFuture<Boolean> fastPutOperationAsync(K key, V value) {
return fastPutOperationAsync(key, value, new MapFastPutOperation(map, key, value, transactionId));
long threadId = Thread.currentThread().getId();
return fastPutOperationAsync(key, value, new MapFastPutOperation(map, key, value, transactionId, threadId));
}
protected RFuture<Boolean> fastPutOperationAsync(K key, V value, MapOperation operation) {
@ -471,6 +476,7 @@ public class BaseTransactionalMap<K, V> {
@SuppressWarnings("unchecked")
protected RFuture<Long> fastRemoveOperationAsync(K... keys) {
RPromise<Long> result = new RedissonPromise<Long>();
long threadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
@ -481,7 +487,7 @@ public class BaseTransactionalMap<K, V> {
HashValue keyHash = toKeyHash(key);
MapEntry currentValue = state.get(keyHash);
if (currentValue != null && currentValue != MapEntry.NULL) {
operations.add(new MapFastRemoveOperation(map, key, transactionId));
operations.add(new MapFastRemoveOperation(map, key, transactionId, threadId));
state.put(keyHash, MapEntry.NULL);
counter.incrementAndGet();
@ -498,7 +504,7 @@ public class BaseTransactionalMap<K, V> {
for (K key : res.keySet()) {
HashValue keyHash = toKeyHash(key);
operations.add(new MapFastRemoveOperation(map, key, transactionId));
operations.add(new MapFastRemoveOperation(map, key, transactionId, threadId));
counter.incrementAndGet();
state.put(keyHash, MapEntry.NULL);
}
@ -674,13 +680,14 @@ public class BaseTransactionalMap<K, V> {
protected RFuture<V> removeOperationAsync(K key) {
RPromise<V> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
executeLocked(result, key, new Runnable() {
@Override
public void run() {
HashValue keyHash = toKeyHash(key);
MapEntry entry = state.get(keyHash);
if (entry != null) {
operations.add(new MapRemoveOperation(map, key, transactionId));
operations.add(new MapRemoveOperation(map, key, transactionId, threadId));
if (entry == MapEntry.NULL) {
result.trySuccess(null);
} else {
@ -695,7 +702,7 @@ public class BaseTransactionalMap<K, V> {
result.tryFailure(e);
return;
}
operations.add(new MapRemoveOperation(map, key, transactionId));
operations.add(new MapRemoveOperation(map, key, transactionId, threadId));
if (res != null) {
state.put(keyHash, MapEntry.NULL);
}
@ -709,6 +716,7 @@ public class BaseTransactionalMap<K, V> {
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
RPromise<Boolean> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
executeLocked(result, (K) key, new Runnable() {
@Override
public void run() {
@ -720,7 +728,7 @@ public class BaseTransactionalMap<K, V> {
return;
}
operations.add(new MapRemoveOperation(map, key, value, transactionId));
operations.add(new MapRemoveOperation(map, key, value, transactionId, threadId));
if (isEqual(entry.getValue(), value)) {
state.put(keyHash, MapEntry.NULL);
result.trySuccess(true);
@ -736,7 +744,7 @@ public class BaseTransactionalMap<K, V> {
result.tryFailure(e);
return;
}
operations.add(new MapRemoveOperation(map, key, value, transactionId));
operations.add(new MapRemoveOperation(map, key, value, transactionId, threadId));
boolean res = isEqual(r, value);
if (res) {
state.put(keyHash, MapEntry.NULL);
@ -762,11 +770,12 @@ public class BaseTransactionalMap<K, V> {
protected RFuture<Void> putAllOperationAsync(Map<? extends K, ? extends V> entries) {
RPromise<Void> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
for (Entry<? extends K, ? extends V> entry : entries.entrySet()) {
operations.add(new MapPutOperation(map, entry.getKey(), entry.getValue(), transactionId));
operations.add(new MapPutOperation(map, entry.getKey(), entry.getValue(), transactionId, threadId));
HashValue keyHash = toKeyHash(entry.getKey());
state.put(keyHash, new MapEntry(entry.getKey(), entry.getValue()));
}
@ -783,6 +792,7 @@ public class BaseTransactionalMap<K, V> {
protected RFuture<Boolean> replaceOperationAsync(K key, V oldValue, V newValue) {
RPromise<Boolean> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
executeLocked(result, key, new Runnable() {
@Override
public void run() {
@ -794,7 +804,7 @@ public class BaseTransactionalMap<K, V> {
return;
}
operations.add(new MapReplaceOperation(map, key, newValue, oldValue, transactionId));
operations.add(new MapReplaceOperation(map, key, newValue, oldValue, transactionId, threadId));
if (isEqual(entry.getValue(), oldValue)) {
state.put(keyHash, new MapEntry(key, newValue));
result.trySuccess(true);
@ -811,7 +821,7 @@ public class BaseTransactionalMap<K, V> {
return;
}
operations.add(new MapReplaceOperation(map, key, newValue, oldValue, transactionId));
operations.add(new MapReplaceOperation(map, key, newValue, oldValue, transactionId, threadId));
boolean res = isEqual(r, oldValue);
if (res) {
state.put(keyHash, new MapEntry(key, newValue));
@ -825,12 +835,13 @@ public class BaseTransactionalMap<K, V> {
protected RFuture<V> replaceOperationAsync(K key, V value) {
RPromise<V> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
executeLocked(result, key, new Runnable() {
@Override
public void run() {
HashValue keyHash = toKeyHash(key);
MapEntry entry = state.get(keyHash);
operations.add(new MapReplaceOperation(map, key, value, transactionId));
operations.add(new MapReplaceOperation(map, key, value, transactionId, threadId));
if (entry != null) {
if (entry == MapEntry.NULL) {
result.trySuccess(null);
@ -848,7 +859,7 @@ public class BaseTransactionalMap<K, V> {
return;
}
operations.add(new MapReplaceOperation(map, key, value, transactionId));
operations.add(new MapReplaceOperation(map, key, value, transactionId, threadId));
if (res != null) {
state.put(keyHash, new MapEntry(key, value));
}

@ -41,21 +41,25 @@ public class BaseTransactionalMapCache<K, V> extends BaseTransactionalMap<K, V>
}
public RFuture<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
return putIfAbsentOperationAsync(key, value, new MapCachePutIfAbsentOperation(map, key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId));
long threadId = Thread.currentThread().getId();
return putIfAbsentOperationAsync(key, value, new MapCachePutIfAbsentOperation(map, key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId, threadId));
}
public RFuture<Boolean> fastPutOperationAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
return fastPutOperationAsync(key, value, new MapCacheFastPutOperation(map, key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId));
long threadId = Thread.currentThread().getId();
return fastPutOperationAsync(key, value, new MapCacheFastPutOperation(map, key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId, threadId));
}
public RFuture<V> putOperationAsync(K key, V value, long ttlTimeout, long maxIdleTimeout, long maxIdleDelta) {
long threadId = Thread.currentThread().getId();
return putOperationAsync(key, value, new MapCachePutOperation(map, key, value,
ttlTimeout, TimeUnit.MILLISECONDS, maxIdleTimeout, TimeUnit.MILLISECONDS, transactionId));
ttlTimeout, TimeUnit.MILLISECONDS, maxIdleTimeout, TimeUnit.MILLISECONDS, transactionId, threadId));
}
public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
long threadId = Thread.currentThread().getId();
return fastPutIfAbsentOperationAsync(key, value, new MapCacheFastPutIfAbsentOperation(map, key, value,
ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId));
ttl, ttlUnit, maxIdleTime, maxIdleUnit, transactionId, threadId));
}
}

@ -15,25 +15,11 @@
*/
package org.redisson.transaction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import org.redisson.RedissonMultiLock;
import org.redisson.RedissonObject;
import org.redisson.RedissonSet;
import org.redisson.api.RCollectionAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RObject;
import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.api.*;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
@ -47,7 +33,9 @@ import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.UnlinkOperation;
import org.redisson.transaction.operation.set.MoveOperation;
import io.netty.buffer.ByteBuf;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
/**
*
@ -226,7 +214,8 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
}
public RFuture<Boolean> addAsync(V value) {
TransactionalOperation operation = createAddOperation(value);
long threadId = Thread.currentThread().getId();
TransactionalOperation operation = createAddOperation(value, threadId);
return addAsync(value, operation);
}
@ -266,7 +255,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return result;
}
protected abstract TransactionalOperation createAddOperation(V value);
protected abstract TransactionalOperation createAddOperation(V value, long threadId);
public RFuture<V> removeRandomAsync() {
throw new UnsupportedOperationException();
@ -328,13 +317,14 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
public RFuture<Boolean> removeAsync(Object value) {
RPromise<Boolean> result = new RedissonPromise<Boolean>();
long threadId = Thread.currentThread().getId();
executeLocked(result, (V) value, new Runnable() {
@Override
public void run() {
HashValue keyHash = toHash(value);
Object currentValue = state.get(keyHash);
if (currentValue != null) {
operations.add(createRemoveOperation(value));
operations.add(createRemoveOperation(value, threadId));
if (currentValue == NULL) {
result.trySuccess(false);
} else {
@ -350,7 +340,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return;
}
operations.add(createRemoveOperation(value));
operations.add(createRemoveOperation(value, threadId));
if (res) {
state.put(keyHash, NULL);
}
@ -364,7 +354,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
}
protected abstract TransactionalOperation createRemoveOperation(Object value);
protected abstract TransactionalOperation createRemoveOperation(Object value, long threadId);
public RFuture<Boolean> containsAllAsync(Collection<?> c) {
List<Object> coll = new ArrayList<Object>(c);
@ -383,6 +373,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
public RFuture<Boolean> addAllAsync(Collection<? extends V> c) {
RPromise<Boolean> result = new RedissonPromise<Boolean>();
long threadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
@ -393,7 +384,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
}
for (V value : c) {
operations.add(createAddOperation(value));
operations.add(createAddOperation(value, threadId));
HashValue keyHash = toHash(value);
state.put(keyHash, value);
}
@ -415,6 +406,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
public RFuture<Boolean> removeAllAsync(Collection<?> c) {
RPromise<Boolean> result = new RedissonPromise<Boolean>();
long threadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
@ -425,7 +417,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
}
for (Object value : c) {
operations.add(createRemoveOperation(value));
operations.add(createRemoveOperation(value, threadId));
HashValue keyHash = toHash(value);
state.put(keyHash, NULL);
}

@ -15,11 +15,7 @@
*/
package org.redisson.transaction;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import org.redisson.RedissonBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
@ -31,13 +27,12 @@ import org.redisson.transaction.operation.DeleteOperation;
import org.redisson.transaction.operation.TouchOperation;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.UnlinkOperation;
import org.redisson.transaction.operation.bucket.BucketCompareAndSetOperation;
import org.redisson.transaction.operation.bucket.BucketGetAndDeleteOperation;
import org.redisson.transaction.operation.bucket.BucketGetAndSetOperation;
import org.redisson.transaction.operation.bucket.BucketSetOperation;
import org.redisson.transaction.operation.bucket.BucketTrySetOperation;
import org.redisson.transaction.operation.bucket.*;
import io.netty.buffer.ByteBuf;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -196,11 +191,12 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
public RFuture<Boolean> deleteAsync() {
checkState();
RPromise<Boolean> result = new RedissonPromise<Boolean>();
long threadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
if (state != null) {
operations.add(new DeleteOperation(getName(), getLockName(), transactionId));
operations.add(new DeleteOperation(getName(), getLockName(), transactionId, threadId));
if (state == NULL) {
result.trySuccess(false);
} else {
@ -216,7 +212,7 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
return;
}
operations.add(new DeleteOperation(getName(), getLockName(), transactionId));
operations.add(new DeleteOperation(getName(), getLockName(), transactionId, threadId));
state = NULL;
result.trySuccess(res);
});

@ -141,6 +141,7 @@ public class RedissonTransactionalBuckets extends RedissonBuckets {
public RFuture<Long> deleteAsync(String... keys) {
checkState();
RPromise<Long> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
executeLocked(result, new Runnable() {
@Override
public void run() {
@ -149,7 +150,7 @@ public class RedissonTransactionalBuckets extends RedissonBuckets {
for (String key : keys) {
Object st = state.get(key);
if (st != null) {
operations.add(new DeleteOperation(key, getLockName(key), transactionId));
operations.add(new DeleteOperation(key, getLockName(key), transactionId, threadId));
if (st != NULL) {
state.put(key, NULL);
counter.incrementAndGet();
@ -168,7 +169,7 @@ public class RedissonTransactionalBuckets extends RedissonBuckets {
}
if (res > 0) {
operations.add(new DeleteOperation(key, getLockName(key), transactionId));
operations.add(new DeleteOperation(key, getLockName(key), transactionId, threadId));
state.put(key, NULL);
counter.incrementAndGet();
}

@ -61,8 +61,8 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
}
@Override
protected TransactionalOperation createAddOperation(V value) {
return new AddOperation(set, value, transactionId);
protected TransactionalOperation createAddOperation(V value, long threadId) {
return new AddOperation(set, value, transactionId, threadId);
}
@Override
@ -71,8 +71,8 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
}
@Override
protected TransactionalOperation createRemoveOperation(Object value) {
return new RemoveOperation(set, value, transactionId);
protected TransactionalOperation createRemoveOperation(Object value, long threadId) {
return new RemoveOperation(set, value, transactionId, threadId);
}
@Override

@ -62,12 +62,13 @@ public class TransactionalSetCache<V> extends BaseTransactionalSet<V> {
}
public RFuture<Boolean> addAsync(V value, long ttl, TimeUnit ttlUnit) {
return addAsync(value, new AddCacheOperation(set, value, ttl, ttlUnit, transactionId));
long threadId = Thread.currentThread().getId();
return addAsync(value, new AddCacheOperation(set, value, ttl, ttlUnit, transactionId, threadId));
}
@Override
protected TransactionalOperation createAddOperation(V value) {
return new AddCacheOperation(set, value, transactionId);
protected TransactionalOperation createAddOperation(V value, long threadId) {
return new AddCacheOperation(set, value, transactionId, threadId);
}
@Override
@ -76,8 +77,8 @@ public class TransactionalSetCache<V> extends BaseTransactionalSet<V> {
}
@Override
protected TransactionalOperation createRemoveOperation(Object value) {
return new RemoveCacheOperation(set, value, transactionId);
protected TransactionalOperation createRemoveOperation(Object value, long threadId) {
return new RemoveCacheOperation(set, value, transactionId, threadId);
}
@Override

@ -30,15 +30,17 @@ public class DeleteOperation extends TransactionalOperation {
private String lockName;
private String transactionId;
private long threadId;
public DeleteOperation(String name) {
this(name, null, null);
this(name, null, null, 0);
}
public DeleteOperation(String name, String lockName, String transactionId) {
public DeleteOperation(String name, String lockName, String transactionId, long threadId) {
super(name, null);
this.lockName = lockName;
this.transactionId = transactionId;
this.threadId = threadId;
}
@Override

@ -27,8 +27,8 @@ public class MapAddAndGetOperation extends MapOperation {
public MapAddAndGetOperation() {
}
public MapAddAndGetOperation(RMap<?, ?> map, Object key, Object value, String transactionId) {
super(map, key, value, transactionId);
public MapAddAndGetOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
}
@Override

@ -33,8 +33,8 @@ public class MapCacheFastPutIfAbsentOperation extends MapOperation {
private TimeUnit maxIdleUnit;
public MapCacheFastPutIfAbsentOperation(RMap<?, ?> map, Object key, Object value, long ttl, TimeUnit ttlUnit,
long maxIdleTime, TimeUnit maxIdleUnit, String transactionId) {
super(map, key, value, transactionId);
long maxIdleTime, TimeUnit maxIdleUnit, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
this.ttl = ttl;
this.ttlUnit = ttlUnit;
this.maxIdleTime = maxIdleTime;

@ -35,8 +35,8 @@ public class MapCacheFastPutOperation extends MapOperation {
public MapCacheFastPutOperation() {
}
public MapCacheFastPutOperation(RMap<?, ?> map, Object key, Object value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit, String transactionId) {
super(map, key, value, transactionId);
public MapCacheFastPutOperation(RMap<?, ?> map, Object key, Object value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
this.ttl = ttl;
this.ttlUnit = ttlUnit;
this.maxIdleTime = maxIdleTime;

@ -36,16 +36,16 @@ public class MapCachePutIfAbsentOperation extends MapOperation {
}
public MapCachePutIfAbsentOperation(RMap<?, ?> map, Object key, Object value,
long ttl, TimeUnit unit, long maxIdleTime, TimeUnit maxIdleUnit, String transactionId) {
this(map, key, value, transactionId);
long ttl, TimeUnit unit, long maxIdleTime, TimeUnit maxIdleUnit, String transactionId, long threadId) {
this(map, key, value, transactionId, threadId);
this.ttl = ttl;
this.unit = unit;
this.maxIdleTime = maxIdleTime;
this.maxIdleUnit = maxIdleUnit;
}
public MapCachePutIfAbsentOperation(RMap<?, ?> map, Object key, Object value, String transactionId) {
super(map, key, value, transactionId);
public MapCachePutIfAbsentOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
}
@Override

@ -35,8 +35,8 @@ public class MapCachePutOperation extends MapOperation {
public MapCachePutOperation() {
}
public MapCachePutOperation(RMap<?, ?> map, Object key, Object value, long ttlTimeout, TimeUnit ttlUnit, long maxIdleTimeout, TimeUnit maxIdleUnit, String transactionId) {
super(map, key, value, transactionId);
public MapCachePutOperation(RMap<?, ?> map, Object key, Object value, long ttlTimeout, TimeUnit ttlUnit, long maxIdleTimeout, TimeUnit maxIdleUnit, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
this.ttlTimeout = ttlTimeout;
this.ttlUnit = ttlUnit;
this.maxIdleTimeout = maxIdleTimeout;

@ -27,8 +27,8 @@ public class MapFastPutIfAbsentOperation extends MapOperation {
public MapFastPutIfAbsentOperation() {
}
public MapFastPutIfAbsentOperation(RMap<?, ?> map, Object key, Object value, String transactionId) {
super(map, key, value, transactionId);
public MapFastPutIfAbsentOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
}
@Override

@ -27,8 +27,8 @@ public class MapFastPutOperation extends MapOperation {
public MapFastPutOperation() {
}
public MapFastPutOperation(RMap<?, ?> map, Object key, Object value, String transactionId) {
super(map, key, value, transactionId);
public MapFastPutOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
}
@Override

@ -27,8 +27,8 @@ public class MapFastRemoveOperation extends MapOperation {
public MapFastRemoveOperation() {
}
public MapFastRemoveOperation(RMap<?, ?> map, Object key, String transactionId) {
super(map, key, null, transactionId);
public MapFastRemoveOperation(RMap<?, ?> map, Object key, String transactionId, long threadId) {
super(map, key, null, transactionId, threadId);
}
@Override

@ -36,21 +36,23 @@ public abstract class MapOperation extends TransactionalOperation {
Object oldValue;
RMap<?, ?> map;
String transactionId;
long threadId;
public MapOperation() {
}
public MapOperation(RMap<?, ?> map, Object key, Object value, String transactionId) {
this(map, key, value, null, transactionId);
public MapOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
this(map, key, value, null, transactionId, threadId);
}
public MapOperation(RMap<?, ?> map, Object key, Object value, Object oldValue, String transactionId) {
public MapOperation(RMap<?, ?> map, Object key, Object value, Object oldValue, String transactionId, long threadId) {
super(map.getName(), map.getCodec());
this.map = map;
this.key = key;
this.value = value;
this.oldValue = oldValue;
this.transactionId = transactionId;
this.threadId = threadId;
}
public Object getKey() {
@ -65,20 +67,20 @@ public abstract class MapOperation extends TransactionalOperation {
public final void commit(CommandAsyncExecutor commandExecutor) {
RMap<Object, Object> map = getMap(commandExecutor);
commit(map);
getLock(map, commandExecutor, key).unlockAsync();
getLock(map, commandExecutor, key).unlockAsync(threadId);
}
protected RMap<Object, Object> getMap(CommandAsyncExecutor commandExecutor) {
if (map instanceof RMapCache) {
return new RedissonMapCache<Object, Object>(codec, null, commandExecutor, name, null, null, null);
return new RedissonMapCache<>(codec, null, commandExecutor, name, null, null, null);
}
return new RedissonMap<Object, Object>(codec, commandExecutor, name, null, null, null);
return new RedissonMap<>(codec, commandExecutor, name, null, null, null);
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RMap<Object, Object> map = getMap(commandExecutor);
getLock(map, commandExecutor, key).unlockAsync();
getLock(map, commandExecutor, key).unlockAsync(threadId);
}
protected RLock getLock(RMap<?, ?> map, CommandAsyncExecutor commandExecutor, Object key) {

@ -27,8 +27,8 @@ public class MapPutIfAbsentOperation extends MapOperation {
public MapPutIfAbsentOperation() {
}
public MapPutIfAbsentOperation(RMap<?, ?> map, Object key, Object value, String transactionId) {
super(map, key, value, transactionId);
public MapPutIfAbsentOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
}
@Override

@ -27,8 +27,8 @@ public class MapPutOperation extends MapOperation {
public MapPutOperation() {
}
public MapPutOperation(RMap<?, ?> map, Object key, Object value, String transactionId) {
super(map, key, value, transactionId);
public MapPutOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
}
@Override

@ -27,12 +27,12 @@ public class MapRemoveOperation extends MapOperation {
public MapRemoveOperation() {
}
public MapRemoveOperation(RMap<?, ?> map, Object key, String transactionId) {
super(map, key, null, transactionId);
public MapRemoveOperation(RMap<?, ?> map, Object key, String transactionId, long threadId) {
super(map, key, null, transactionId, threadId);
}
public MapRemoveOperation(RMap<?, ?> map, Object key, Object value, String transactionId) {
super(map, key, value, transactionId);
public MapRemoveOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
}
@Override

@ -27,12 +27,12 @@ public class MapReplaceOperation extends MapOperation {
public MapReplaceOperation() {
}
public MapReplaceOperation(RMap<?, ?> map, Object key, Object value, Object oldValue, String transactionId) {
super(map, key, value, oldValue, transactionId);
public MapReplaceOperation(RMap<?, ?> map, Object key, Object value, Object oldValue, String transactionId, long threadId) {
super(map, key, value, oldValue, transactionId, threadId);
}
public MapReplaceOperation(RMap<?, ?> map, Object key, Object value, String transactionId) {
super(map, key, value, transactionId);
public MapReplaceOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
}
@Override

@ -33,37 +33,39 @@ public class AddCacheOperation extends SetOperation {
private Object value;
private long ttl;
private TimeUnit timeUnit;
private long threadId;
public AddCacheOperation(RObject set, Object value, String transactionId) {
this(set, value, 0, null, transactionId);
public AddCacheOperation(RObject set, Object value, String transactionId, long threadId) {
this(set, value, 0, null, transactionId, threadId);
}
public AddCacheOperation(RObject set, Object value, long ttl, TimeUnit timeUnit, String transactionId) {
this(set.getName(), set.getCodec(), value, ttl, timeUnit, transactionId);
public AddCacheOperation(RObject set, Object value, long ttl, TimeUnit timeUnit, String transactionId, long threadId) {
this(set.getName(), set.getCodec(), value, ttl, timeUnit, transactionId, threadId);
}
public AddCacheOperation(String name, Codec codec, Object value, long ttl, TimeUnit timeUnit, String transactionId) {
public AddCacheOperation(String name, Codec codec, Object value, long ttl, TimeUnit timeUnit, String transactionId, long threadId) {
super(name, codec, transactionId);
this.value = value;
this.timeUnit = timeUnit;
this.ttl = ttl;
this.threadId = threadId;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RSetCache<Object> set = new RedissonSetCache<Object>(codec, null, commandExecutor, name, null);
RSetCache<Object> set = new RedissonSetCache<>(codec, null, commandExecutor, name, null);
if (timeUnit != null) {
set.addAsync(value, ttl, timeUnit);
} else {
set.addAsync(value);
}
getLock(set, commandExecutor, value).unlockAsync();
getLock(set, commandExecutor, value).unlockAsync(threadId);
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RSetCache<Object> set = new RedissonSetCache<Object>(codec, null, commandExecutor, name, null);
getLock(set, commandExecutor, value).unlockAsync();
getLock(set, commandExecutor, value).unlockAsync(threadId);
}
public Object getValue() {

@ -29,14 +29,16 @@ import org.redisson.command.CommandAsyncExecutor;
public class AddOperation extends SetOperation {
private Object value;
private long threadId;
public AddOperation(RObject set, Object value, String transactionId) {
this(set.getName(), set.getCodec(), value, transactionId);
public AddOperation(RObject set, Object value, String transactionId, long threadId) {
this(set.getName(), set.getCodec(), value, transactionId, threadId);
}
public AddOperation(String name, Codec codec, Object value, String transactionId) {
public AddOperation(String name, Codec codec, Object value, String transactionId, long threadId) {
super(name, codec, transactionId);
this.value = value;
this.threadId = threadId;
}
@Override

@ -29,27 +29,29 @@ import org.redisson.command.CommandAsyncExecutor;
public class RemoveCacheOperation extends SetOperation {
private Object value;
public RemoveCacheOperation(RObject set, Object value, String transactionId) {
this(set.getName(), set.getCodec(), value, transactionId);
private long threadId;
public RemoveCacheOperation(RObject set, Object value, String transactionId, long threadId) {
this(set.getName(), set.getCodec(), value, transactionId, threadId);
}
public RemoveCacheOperation(String name, Codec codec, Object value, String transactionId) {
public RemoveCacheOperation(String name, Codec codec, Object value, String transactionId, long threadId) {
super(name, codec, transactionId);
this.value = value;
this.threadId = threadId;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RSetCache<Object> set = new RedissonSetCache<Object>(codec, null, commandExecutor, name, null);
set.removeAsync(value);
getLock(set, commandExecutor, value).unlockAsync();
getLock(set, commandExecutor, value).unlockAsync(threadId);
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RSetCache<Object> set = new RedissonSetCache<Object>(codec, null, commandExecutor, name, null);
getLock(set, commandExecutor, value).unlockAsync();
getLock(set, commandExecutor, value).unlockAsync(threadId);
}
public Object getValue() {

@ -29,27 +29,29 @@ import org.redisson.command.CommandAsyncExecutor;
public class RemoveOperation extends SetOperation {
private Object value;
private long threadId;
public RemoveOperation(RObject set, Object value, String transactionId) {
this(set.getName(), set.getCodec(), value, transactionId);
public RemoveOperation(RObject set, Object value, String transactionId, long threadId) {
this(set.getName(), set.getCodec(), value, transactionId, threadId);
}
public RemoveOperation(String name, Codec codec, Object value, String transactionId) {
public RemoveOperation(String name, Codec codec, Object value, String transactionId, long threadId) {
super(name, codec, transactionId);
this.value = value;
this.threadId = threadId;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RSet<Object> set = new RedissonSet<Object>(codec, commandExecutor, name, null);
RSet<Object> set = new RedissonSet<>(codec, commandExecutor, name, null);
set.removeAsync(value);
getLock(set, commandExecutor, value).unlockAsync();
getLock(set, commandExecutor, value).unlockAsync(threadId);
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
RSet<Object> set = new RedissonSet<Object>(codec, commandExecutor, name, null);
getLock(set, commandExecutor, value).unlockAsync();
RSet<Object> set = new RedissonSet<>(codec, commandExecutor, name, null);
getLock(set, commandExecutor, value).unlockAsync(threadId);
}
public Object getValue() {

Loading…
Cancel
Save