refactoring

pull/1461/head
Nikita
parent d88e2fb6c3
commit af51a3f2b8

@ -118,6 +118,7 @@ public class TransactionOptions {
}
/**
* If transaction hasn't been committed within <code>timeout</code> it will rollback automatically.
* Set <code>-1</code> to disable.
* <p>
* Default is <code>5000 milliseconds</code>
*
@ -126,6 +127,10 @@ public class TransactionOptions {
* @return self instance
*/
public TransactionOptions timeout(long timeout, TimeUnit timeoutUnit) {
if (timeout == -1) {
this.timeout = timeout;
return this;
}
this.timeout = timeoutUnit.toMillis(timeout);
return this;
}

@ -425,7 +425,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
private RFuture<Void> registerSentinel(final URI addr, final MasterSlaveServersConfig c) {
String key = addr.getHost() + ":" + addr.getPort();
final String key = addr.getHost() + ":" + addr.getPort();
RedisClient sentinel = sentinels.get(key);
if (sentinel != null) {
return RedissonPromise.newSucceededFuture(null);

@ -79,8 +79,8 @@ public class RedissonTransaction implements RTransaction {
private final AtomicBoolean executed = new AtomicBoolean();
private final TransactionOptions options;
private final List<TransactionalOperation> operations = new ArrayList<TransactionalOperation>();
private final Set<String> localCaches = new HashSet<String>();
private List<TransactionalOperation> operations = new ArrayList<TransactionalOperation>();
private Set<String> localCaches = new HashSet<String>();
private final long startTime = System.currentTimeMillis();
public RedissonTransaction(CommandAsyncExecutor commandExecutor, TransactionOptions options) {
@ -89,6 +89,15 @@ public class RedissonTransaction implements RTransaction {
this.commandExecutor = commandExecutor;
}
public RedissonTransaction(CommandAsyncExecutor commandExecutor, TransactionOptions options,
List<TransactionalOperation> operations, Set<String> localCaches) {
super();
this.commandExecutor = commandExecutor;
this.options = options;
this.operations = operations;
this.localCaches = localCaches;
}
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(RLocalCachedMap<K, V> fromInstance) {
checkState();
@ -183,7 +192,7 @@ public class RedissonTransaction implements RTransaction {
final String id = generateId();
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Map<HashKey, HashValue>> future = disableLocalCacheAsync(id);
RFuture<Map<HashKey, HashValue>> future = disableLocalCacheAsync(id, localCaches, operations);
future.addListener(new FutureListener<Map<HashKey, HashValue>>() {
@Override
public void operationComplete(Future<Map<HashKey, HashValue>> future) throws Exception {
@ -224,7 +233,6 @@ public class RedissonTransaction implements RTransaction {
}
enableLocalCacheAsync(id, hashes);
operations.clear();
executed.set(true);
result.trySuccess(null);
@ -234,9 +242,13 @@ public class RedissonTransaction implements RTransaction {
});
return result;
}
@Override
public void commit() {
commit(localCaches, operations);
}
public void commit(Set<String> localCaches, List<TransactionalOperation> operations) {
checkState();
checkTimeout();
@ -248,7 +260,7 @@ public class RedissonTransaction implements RTransaction {
}
String id = generateId();
Map<HashKey, HashValue> hashes = disableLocalCache(id);
Map<HashKey, HashValue> hashes = disableLocalCache(id, localCaches, operations);
try {
checkTimeout();
@ -278,12 +290,11 @@ public class RedissonTransaction implements RTransaction {
enableLocalCache(id, hashes);
operations.clear();
executed.set(true);
}
private void checkTimeout() {
if (System.currentTimeMillis() - startTime > options.getTimeout()) {
if (options.getTimeout() != -1 && System.currentTimeMillis() - startTime > options.getTimeout()) {
throw new TransactionTimeoutException("Transaction was discarded due to timeout " + options.getTimeout() + " milliseconds");
}
}
@ -324,7 +335,7 @@ public class RedissonTransaction implements RTransaction {
}
}
private Map<HashKey, HashValue> disableLocalCache(String requestId) {
private Map<HashKey, HashValue> disableLocalCache(String requestId, Set<String> localCaches, List<TransactionalOperation> operations) {
if (localCaches.isEmpty()) {
return Collections.emptyMap();
}
@ -420,7 +431,7 @@ public class RedissonTransaction implements RTransaction {
return hashes;
}
private RFuture<Map<HashKey, HashValue>> disableLocalCacheAsync(final String requestId) {
private RFuture<Map<HashKey, HashValue>> disableLocalCacheAsync(final String requestId, Set<String> localCaches, List<TransactionalOperation> operations) {
if (localCaches.isEmpty()) {
return RedissonPromise.newSucceededFuture(Collections.<HashKey, HashValue>emptyMap());
}
@ -559,9 +570,13 @@ public class RedissonTransaction implements RTransaction {
PlatformDependent.threadLocalRandom().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
@Override
public void rollback() {
rollback(operations);
}
public void rollback(List<TransactionalOperation> operations) {
checkState();
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
@ -605,6 +620,14 @@ public class RedissonTransaction implements RTransaction {
});
return result;
}
public Set<String> getLocalCaches() {
return localCaches;
}
public List<TransactionalOperation> getOperations() {
return operations;
}
protected void checkState() {
if (executed.get()) {

@ -27,7 +27,7 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class DeleteOperation extends TransactionalOperation {
private final String lockName;
private String lockName;
public DeleteOperation(String name) {
this(name, null);
@ -55,5 +55,9 @@ public class DeleteOperation extends TransactionalOperation {
lock.unlockAsync();
}
}
public String getLockName() {
return lockName;
}
}

@ -27,7 +27,7 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class TouchOperation extends TransactionalOperation {
private final String lockName;
private String lockName;
public TouchOperation(String name) {
this(name, null);
@ -51,5 +51,9 @@ public class TouchOperation extends TransactionalOperation {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
}
public String getLockName() {
return lockName;
}
}

@ -25,8 +25,11 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public abstract class TransactionalOperation {
protected final Codec codec;
protected final String name;
protected Codec codec;
protected String name;
public TransactionalOperation() {
}
public TransactionalOperation(String name, Codec codec) {
this.name = name;

@ -27,7 +27,7 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class UnlinkOperation extends TransactionalOperation {
private final String lockName;
private String lockName;
public UnlinkOperation(String name) {
this(name, null);
@ -55,5 +55,9 @@ public class UnlinkOperation extends TransactionalOperation {
lock.unlockAsync();
}
}
public String getLockName() {
return lockName;
}
}

@ -29,9 +29,9 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class BucketCompareAndSetOperation<V> extends TransactionalOperation {
private final V expected;
private final V value;
private final String lockName;
private V expected;
private V value;
private String lockName;
public BucketCompareAndSetOperation(String name, String lockName, Codec codec, V expected, V value) {
super(name, codec);
@ -53,5 +53,17 @@ public class BucketCompareAndSetOperation<V> extends TransactionalOperation {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
}
public V getExpected() {
return expected;
}
public V getValue() {
return value;
}
public String getLockName() {
return lockName;
}
}

@ -29,7 +29,7 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class BucketGetAndDeleteOperation<V> extends TransactionalOperation {
private final String lockName;
private String lockName;
public BucketGetAndDeleteOperation(String name, String lockName, Codec codec) {
super(name, codec);
@ -49,5 +49,9 @@ public class BucketGetAndDeleteOperation<V> extends TransactionalOperation {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
}
public String getLockName() {
return lockName;
}
}

@ -29,8 +29,8 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class BucketGetAndSetOperation<V> extends TransactionalOperation {
private final Object value;
private final String lockName;
private Object value;
private String lockName;
public BucketGetAndSetOperation(String name, String lockName, Codec codec, Object value) {
super(name, codec);
@ -51,5 +51,13 @@ public class BucketGetAndSetOperation<V> extends TransactionalOperation {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
}
public Object getValue() {
return value;
}
public String getLockName() {
return lockName;
}
}

@ -31,11 +31,11 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class BucketSetOperation<V> extends TransactionalOperation {
private final Object value;
private final String lockName;
private Object value;
private String lockName;
private long timeToLive;
private TimeUnit timeUnit;
public BucketSetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit) {
this(name, lockName, codec, value);
this.timeToLive = timeToLive;
@ -65,5 +65,21 @@ public class BucketSetOperation<V> extends TransactionalOperation {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
}
public Object getValue() {
return value;
}
public String getLockName() {
return lockName;
}
public long getTimeToLive() {
return timeToLive;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
}

@ -31,11 +31,11 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class BucketTrySetOperation<V> extends TransactionalOperation {
private final Object value;
private final String lockName;
private Object value;
private String lockName;
private long timeToLive;
private TimeUnit timeUnit;
public BucketTrySetOperation(String name, String lockName, Codec codec, Object value, long timeToLive, TimeUnit timeUnit) {
this(name, lockName, codec, value);
this.timeToLive = timeToLive;
@ -65,5 +65,21 @@ public class BucketTrySetOperation<V> extends TransactionalOperation {
RedissonLock lock = new RedissonLock(commandExecutor, lockName);
lock.unlockAsync();
}
public Object getValue() {
return value;
}
public String getLockName() {
return lockName;
}
public long getTimeToLive() {
return timeToLive;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
}

@ -24,6 +24,9 @@ import org.redisson.api.RMap;
*/
public class MapAddAndGetOperation extends MapOperation {
public MapAddAndGetOperation() {
}
public MapAddAndGetOperation(RMap<?, ?> map, Object key, Object value) {
super(map, key, value);
}

@ -31,7 +31,7 @@ public class MapCacheFastPutIfAbsentOperation extends MapOperation {
private TimeUnit ttlUnit;
private long maxIdleTime;
private TimeUnit maxIdleUnit;
public MapCacheFastPutIfAbsentOperation(RMap<?, ?> map, Object key, Object value, long ttl, TimeUnit ttlUnit,
long maxIdleTime, TimeUnit maxIdleUnit) {
super(map, key, value);
@ -46,4 +46,20 @@ public class MapCacheFastPutIfAbsentOperation extends MapOperation {
((RMapCache<Object, Object>)map).fastPutIfAbsentAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit);
}
public long getTTL() {
return ttl;
}
public TimeUnit getTTLUnit() {
return ttlUnit;
}
public long getMaxIdleTime() {
return maxIdleTime;
}
public TimeUnit getMaxIdleUnit() {
return maxIdleUnit;
}
}

@ -32,6 +32,9 @@ public class MapCacheFastPutOperation extends MapOperation {
private long maxIdleTime;
private TimeUnit maxIdleUnit;
public MapCacheFastPutOperation() {
}
public MapCacheFastPutOperation(RMap<?, ?> map, Object key, Object value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
super(map, key, value);
this.ttl = ttl;
@ -45,4 +48,20 @@ public class MapCacheFastPutOperation extends MapOperation {
((RMapCache<Object, Object>)map).fastPutAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit);
}
public long getTTL() {
return ttl;
}
public TimeUnit getTTLUnit() {
return ttlUnit;
}
public TimeUnit getMaxIdleUnit() {
return maxIdleUnit;
}
public long getMaxIdleTime() {
return maxIdleTime;
}
}

@ -32,6 +32,9 @@ public class MapCachePutIfAbsentOperation extends MapOperation {
private long maxIdleTime;
private TimeUnit maxIdleUnit;
public MapCachePutIfAbsentOperation() {
}
public MapCachePutIfAbsentOperation(RMap<?, ?> map, Object key, Object value,
long ttl, TimeUnit unit, long maxIdleTime, TimeUnit maxIdleUnit) {
this(map, key, value);
@ -50,4 +53,20 @@ public class MapCachePutIfAbsentOperation extends MapOperation {
((RMapCache<Object, Object>)map).putIfAbsentAsync(key, value, ttl, unit, maxIdleTime, maxIdleUnit);
}
public long getTTL() {
return ttl;
}
public TimeUnit getTTLUnit() {
return unit;
}
public long getMaxIdleTime() {
return maxIdleTime;
}
public TimeUnit getMaxIdleUnit() {
return maxIdleUnit;
}
}

@ -32,6 +32,9 @@ public class MapCachePutOperation extends MapOperation {
private long maxIdleTimeout;
private TimeUnit maxIdleUnit;
public MapCachePutOperation() {
}
public MapCachePutOperation(RMap<?, ?> map, Object key, Object value, long ttlTimeout, TimeUnit ttlUnit, long maxIdleTimeout, TimeUnit maxIdleUnit) {
super(map, key, value);
this.ttlTimeout = ttlTimeout;
@ -45,4 +48,20 @@ public class MapCachePutOperation extends MapOperation {
((RMapCache<Object, Object>)map).putAsync(key, value, ttlTimeout, ttlUnit, maxIdleTimeout, maxIdleUnit);
}
public long getTTL() {
return ttlTimeout;
}
public TimeUnit getTTLUnit() {
return ttlUnit;
}
public long getMaxIdleTimeout() {
return maxIdleTimeout;
}
public TimeUnit getMaxIdleUnit() {
return maxIdleUnit;
}
}

@ -24,6 +24,9 @@ import org.redisson.api.RMap;
*/
public class MapFastPutIfAbsentOperation extends MapOperation {
public MapFastPutIfAbsentOperation() {
}
public MapFastPutIfAbsentOperation(RMap<?, ?> map, Object key, Object value) {
super(map, key, value);
}

@ -24,6 +24,9 @@ import org.redisson.api.RMap;
*/
public class MapFastPutOperation extends MapOperation {
public MapFastPutOperation() {
}
public MapFastPutOperation(RMap<?, ?> map, Object key, Object value) {
super(map, key, value);
}

@ -24,6 +24,9 @@ import org.redisson.api.RMap;
*/
public class MapFastRemoveOperation extends MapOperation {
public MapFastRemoveOperation() {
}
public MapFastRemoveOperation(RMap<?, ?> map, Object key) {
super(map, key, null);
}

@ -29,10 +29,13 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public abstract class MapOperation extends TransactionalOperation {
final Object key;
final Object value;
final Object oldValue;
final RMap<?, ?> map;
Object key;
Object value;
Object oldValue;
RMap<?, ?> map;
public MapOperation() {
}
public MapOperation(RMap<?, ?> map, Object key, Object value) {
this(map, key, value, null);
@ -76,5 +79,11 @@ public abstract class MapOperation extends TransactionalOperation {
protected abstract void commit(RMap<Object, Object> map);
public Object getValue() {
return value;
}
public Object getOldValue() {
return oldValue;
}
}

@ -24,6 +24,9 @@ import org.redisson.api.RMap;
*/
public class MapPutIfAbsentOperation extends MapOperation {
public MapPutIfAbsentOperation() {
}
public MapPutIfAbsentOperation(RMap<?, ?> map, Object key, Object value) {
super(map, key, value);
}

@ -24,6 +24,9 @@ import org.redisson.api.RMap;
*/
public class MapPutOperation extends MapOperation {
public MapPutOperation() {
}
public MapPutOperation(RMap<?, ?> map, Object key, Object value) {
super(map, key, value);
}

@ -24,6 +24,9 @@ import org.redisson.api.RMap;
*/
public class MapRemoveOperation extends MapOperation {
public MapRemoveOperation() {
}
public MapRemoveOperation(RMap<?, ?> map, Object key) {
super(map, key, null);
}

@ -24,6 +24,9 @@ import org.redisson.api.RMap;
*/
public class MapReplaceOperation extends MapOperation {
public MapReplaceOperation() {
}
public MapReplaceOperation(RMap<?, ?> map, Object key, Object value, Object oldValue) {
super(map, key, value, oldValue);
}

@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.RedissonSetCache;
import org.redisson.api.RObject;
import org.redisson.api.RSetCache;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -30,22 +31,25 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class AddCacheOperation extends TransactionalOperation {
final Object value;
final long ttl;
final TimeUnit timeUnit;
private Object value;
private long ttl;
private TimeUnit timeUnit;
public AddCacheOperation(RObject set, Object value) {
this(set, value, 0, null);
}
public AddCacheOperation(RObject set, Object value, long ttl, TimeUnit timeUnit) {
super(set.getName(), set.getCodec());
this(set.getName(), set.getCodec(), value, ttl, timeUnit);
}
public AddCacheOperation(String name, Codec codec, Object value, long ttl, TimeUnit timeUnit) {
super(name, codec);
this.value = value;
this.timeUnit = timeUnit;
this.ttl = ttl;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RSetCache<Object> set = new RedissonSetCache<Object>(codec, null, commandExecutor, name, null);
@ -63,4 +67,16 @@ public class AddCacheOperation extends TransactionalOperation {
set.getLock(value).unlockAsync();
}
public Object getValue() {
return value;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
public long getTTL() {
return ttl;
}
}

@ -18,6 +18,7 @@ package org.redisson.transaction.operation.set;
import org.redisson.RedissonSet;
import org.redisson.api.RObject;
import org.redisson.api.RSet;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -28,10 +29,14 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class AddOperation extends TransactionalOperation {
final Object value;
private Object value;
public AddOperation(RObject set, Object value) {
super(set.getName(), set.getCodec());
this(set.getName(), set.getCodec(), value);
}
public AddOperation(String name, Codec codec, Object value) {
super(name, codec);
this.value = value;
}
@ -48,4 +53,8 @@ public class AddOperation extends TransactionalOperation {
set.getLock(value).unlockAsync();
}
public Object getValue() {
return value;
}
}

@ -18,6 +18,7 @@ package org.redisson.transaction.operation.set;
import org.redisson.RedissonSet;
import org.redisson.api.RObject;
import org.redisson.api.RSet;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -28,12 +29,16 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class MoveOperation extends TransactionalOperation {
final String destinationName;
final Object value;
final long threadId;
private String destinationName;
private Object value;
private long threadId;
public MoveOperation(RObject set, String destinationName, long threadId, Object value) {
super(set.getName(), set.getCodec());
this(set.getName(), set.getCodec(), destinationName, threadId, value);
}
public MoveOperation(String name, Codec codec, String destinationName, long threadId, Object value) {
super(name, codec);
this.destinationName = destinationName;
this.value = value;
this.threadId = threadId;
@ -55,5 +60,17 @@ public class MoveOperation extends TransactionalOperation {
destinationSet.getLock(value).unlockAsync(threadId);
set.getLock(value).unlockAsync(threadId);
}
public String getDestinationName() {
return destinationName;
}
public Object getValue() {
return value;
}
public long getThreadId() {
return threadId;
}
}

@ -18,6 +18,7 @@ package org.redisson.transaction.operation.set;
import org.redisson.RedissonSetCache;
import org.redisson.api.RObject;
import org.redisson.api.RSetCache;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -28,10 +29,14 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class RemoveCacheOperation extends TransactionalOperation {
final Object value;
private Object value;
public RemoveCacheOperation(RObject set, Object value) {
super(set.getName(), set.getCodec());
this(set.getName(), set.getCodec(), value);
}
public RemoveCacheOperation(String name, Codec codec, Object value) {
super(name, codec);
this.value = value;
}
@ -47,5 +52,9 @@ public class RemoveCacheOperation extends TransactionalOperation {
RSetCache<Object> set = new RedissonSetCache<Object>(codec, null, commandExecutor, name, null);
set.getLock(value).unlockAsync();
}
public Object getValue() {
return value;
}
}

@ -18,6 +18,7 @@ package org.redisson.transaction.operation.set;
import org.redisson.RedissonSet;
import org.redisson.api.RObject;
import org.redisson.api.RSet;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
@ -28,10 +29,14 @@ import org.redisson.transaction.operation.TransactionalOperation;
*/
public class RemoveOperation extends TransactionalOperation {
final Object value;
private Object value;
public RemoveOperation(RObject set, Object value) {
super(set.getName(), set.getCodec());
this(set.getName(), set.getCodec(), value);
}
public RemoveOperation(String name, Codec codec, Object value) {
super(name, codec);
this.value = value;
}
@ -48,4 +53,8 @@ public class RemoveOperation extends TransactionalOperation {
set.getLock(value).unlockAsync();
}
public Object getValue() {
return value;
}
}

Loading…
Cancel
Save