commitAsync and rollbackAsync methods added to RTransaction

pull/1423/head
Nikita
parent 3fd1015f20
commit d5b930618c

@ -104,6 +104,25 @@ public class RedissonTopic<M> implements RTopic<M> {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
return addListener(pubSubListener);
}
@Override
public RFuture<Integer> addListenerAsync(final MessageListener<M> listener) {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener);
RPromise<Integer> result = new RedissonPromise<Integer>();
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
result.trySuccess(System.identityHashCode(pubSubListener));
}
});
return result;
}
private int addListener(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener);

@ -15,6 +15,8 @@
*/
package org.redisson.api;
import org.redisson.api.listener.MessageListener;
/**
* Distributed topic. Messages are delivered to all message listeners across Redis cluster.
*
@ -28,8 +30,19 @@ public interface RTopicAsync<M> {
* Publish the message to all subscribers of this topic asynchronously
*
* @param message to send
* @return the <code>RFuture</code> object with number of clients that received the message
* @return number of clients that received the message
*/
RFuture<Long> publishAsync(M message);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
* @param listener for messages
* @return locally unique listener id
* @see org.redisson.api.listener.MessageListener
*/
RFuture<Integer> addListenerAsync(MessageListener<M> listener);
}

@ -157,10 +157,20 @@ public interface RTransaction {
* Commits all changes made on this transaction.
*/
void commit();
/**
* Commits all changes made on this transaction in async mode.
*/
RFuture<Void> commitAsync();
/**
* Rollback all changes made on this transaction.
*/
void rollback();
/**
* Rollback all changes made on this transaction in async mode.
*/
RFuture<Void> rollbackAsync();
}

@ -42,6 +42,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;

@ -44,6 +44,7 @@ import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;

@ -33,10 +33,10 @@ import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.CountableListener;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SlaveConnectionPool;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;

@ -13,12 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
package org.redisson.misc;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -51,6 +49,15 @@ public class CountableListener<T> implements FutureListener<Object> {
counter.incrementAndGet();
}
public void decCounter() {
if (counter.decrementAndGet() == 0) {
onSuccess(value);
if (result != null) {
result.trySuccess(value);
}
}
}
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
@ -59,13 +66,8 @@ public class CountableListener<T> implements FutureListener<Object> {
}
return;
}
if (counter.decrementAndGet() == 0) {
onSuccess(value);
if (result != null) {
result.trySuccess(value);
}
}
decCounter();
}
protected void onSuccess(T value) {

@ -166,8 +166,6 @@ public class BaseTransactionalMap<K, V> {
result.trySuccess(exists);
}
});
result.trySuccess(null);
return result;
}
@ -536,13 +534,14 @@ public class BaseTransactionalMap<K, V> {
return;
}
for (K key : keys) {
for (K key : future.getNow().keySet()) {
HashValue keyHash = toKeyHash(key);
operations.add(new MapFastRemoveOperation(map, key));
counter.incrementAndGet();
state.put(keyHash, MapEntry.NULL);
}
result.trySuccess(null);
result.trySuccess(counter.get());
}
});
}

@ -0,0 +1,53 @@
package org.redisson.transaction;
import org.redisson.client.codec.Codec;
/**
*
* @author Nikita Koksharov
*
*/
public class HashKey {
final Codec codec;
final String name;
public HashKey(String name, Codec codec) {
this.name = name;
this.codec = codec;
}
public Codec getCodec() {
return codec;
}
public String getName() {
return name;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
HashKey other = (HashKey) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
return true;
}
}

@ -0,0 +1,28 @@
package org.redisson.transaction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author Nikita Koksharov
*
*/
public class HashValue {
private final AtomicInteger counter = new AtomicInteger();
private final List<byte[]> keyIds = new ArrayList<byte[]>();
public HashValue() {
}
public AtomicInteger getCounter() {
return counter;
}
public List<byte[]> getKeyIds() {
return keyIds;
}
}

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -33,6 +34,7 @@ import org.redisson.RedissonLocalCachedMap;
import org.redisson.RedissonObject;
import org.redisson.RedissonTopic;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
@ -54,10 +56,15 @@ import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.map.MapOperation;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
@ -162,6 +169,72 @@ public class RedissonTransaction implements RTransaction {
return new RedissonTransactionalMapCache<K, V>(codec, commandExecutor, name, operations, options.getTimeout(), executed);
}
@Override
public RFuture<Void> commitAsync() {
checkState();
checkTimeout();
CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager());
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.commit(transactionExecutor);
}
String id = generateId();
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Map<HashKey, HashValue>> future = disableLocalCacheAsync(id);
future.addListener(new FutureListener<Map<HashKey, HashValue>>() {
@Override
public void operationComplete(Future<Map<HashKey, HashValue>> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(new TransactionException("Unable to execute transaction", future.cause()));
return;
}
Map<HashKey, HashValue> hashes = future.getNow();
try {
checkTimeout();
} catch (TransactionTimeoutException e) {
enableLocalCacheAsync(id, hashes);
result.tryFailure(e);
return;
}
int syncSlaves = 0;
if (!commandExecutor.getConnectionManager().isClusterMode()) {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next();
syncSlaves = entry.getAvailableClients() - 1;
}
BatchOptions batchOptions = BatchOptions.defaults()
.syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS)
.responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS)
.retryAttempts(options.getRetryAttempts())
.retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS)
.atomic();
RFuture<Object> transactionFuture = transactionExecutor.executeAsync(batchOptions);
transactionFuture.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(new TransactionException("Unable to execute transaction", future.cause()));
return;
}
enableLocalCacheAsync(id, hashes);
operations.clear();
executed.set(true);
result.trySuccess(null);
}
});
}
});
return result;
}
@Override
public void commit() {
checkState();
@ -175,7 +248,7 @@ public class RedissonTransaction implements RTransaction {
}
String id = generateId();
Map<TransactionalOperation, List<byte[]>> hashes = disableLocalCache(id);
Map<HashKey, HashValue> hashes = disableLocalCache(id);
try {
checkTimeout();
@ -205,6 +278,7 @@ public class RedissonTransaction implements RTransaction {
enableLocalCache(id, hashes);
operations.clear();
executed.set(true);
}
@ -214,16 +288,32 @@ public class RedissonTransaction implements RTransaction {
}
}
private void enableLocalCache(String requestId, Map<TransactionalOperation, List<byte[]>> hashes) {
private RFuture<BatchResult<?>> enableLocalCacheAsync(String requestId, Map<HashKey, HashValue> hashes) {
if (hashes.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX);
RTopicAsync<Object> topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE);
LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]));
topic.publishAsync(msg);
}
return publishBatch.executeAsync();
}
private void enableLocalCache(String requestId, Map<HashKey, HashValue> hashes) {
if (hashes.isEmpty()) {
return;
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (Entry<TransactionalOperation, List<byte[]>> entry : hashes.entrySet()) {
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX);
RTopicAsync<Object> topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE);
LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().toArray(new byte[entry.getValue().size()][]));
LocalCachedMapEnable msg = new LocalCachedMapEnable(requestId, entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]));
topic.publishAsync(msg);
}
@ -232,28 +322,28 @@ public class RedissonTransaction implements RTransaction {
} catch (Exception e) {
// skip it. Disabled local cache entries are enabled once reach timeout.
}
}
private Map<TransactionalOperation, List<byte[]>> disableLocalCache(String requestId) {
private Map<HashKey, HashValue> disableLocalCache(String requestId) {
if (localCaches.isEmpty()) {
return Collections.emptyMap();
}
Map<TransactionalOperation, List<byte[]>> hashes = new HashMap<TransactionalOperation, List<byte[]>>(localCaches.size());
Map<HashKey, HashValue> hashes = new HashMap<HashKey, HashValue>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) {
MapOperation mapOperation = (MapOperation) transactionalOperation;
RedissonLocalCachedMap<?, ?> map = (RedissonLocalCachedMap<?, ?>)mapOperation.getMap();
HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec());
byte[] key = map.toCacheKey(mapOperation.getKey()).getKeyHash();
List<byte[]> list = hashes.get(transactionalOperation);
if (list == null) {
list = new ArrayList<byte[]>();
hashes.put(transactionalOperation, list);
HashValue value = hashes.get(hashKey);
if (value == null) {
value = new HashValue();
hashes.put(hashKey, value);
}
list.add(key);
value.getKeyIds().add(key);
String disabledKeysName = RedissonObject.suffixName(transactionalOperation.getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = batch.getListMultimapCache(disabledKeysName, transactionalOperation.getCodec());
@ -269,18 +359,16 @@ public class RedissonTransaction implements RTransaction {
throw new TransactionException("Unable to execute transaction over local cached map objects: " + localCaches, e);
}
final Map<String, AtomicInteger> map = new HashMap<String, AtomicInteger>();
final CountDownLatch latch = new CountDownLatch(hashes.size());
List<RTopic<Object>> topics = new ArrayList<RTopic<Object>>();
for (final Entry<TransactionalOperation, List<byte[]>> entry : hashes.entrySet()) {
for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {
RTopic<Object> topic = new RedissonTopic<Object>(LocalCachedMessageCodec.INSTANCE,
commandExecutor, RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX));
topics.add(topic);
map.put(entry.getKey().getName(), new AtomicInteger());
topic.addListener(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
AtomicInteger counter = map.get(entry.getKey().getName());
AtomicInteger counter = entry.getValue().getCounter();
if (counter.decrementAndGet() == 0) {
latch.countDown();
}
@ -289,7 +377,7 @@ public class RedissonTransaction implements RTransaction {
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (final Entry<TransactionalOperation, List<byte[]>> entry : hashes.entrySet()) {
for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec());
LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout());
@ -297,7 +385,7 @@ public class RedissonTransaction implements RTransaction {
RTopicAsync<Object> topic = publishBatch.getTopic(RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX), LocalCachedMessageCodec.INSTANCE);
RFuture<Long> future = topic.publishAsync(new LocalCachedMapDisable(requestId,
entry.getValue().toArray(new byte[entry.getValue().size()][]), options.getResponseTimeout()));
entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]), options.getResponseTimeout()));
future.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
@ -306,7 +394,7 @@ public class RedissonTransaction implements RTransaction {
}
int receivers = future.getNow().intValue();
AtomicInteger counter = map.get(entry.getKey().getName());
AtomicInteger counter = entry.getValue().getCounter();
if (counter.addAndGet(receivers) == 0) {
latch.countDown();
}
@ -332,6 +420,139 @@ public class RedissonTransaction implements RTransaction {
return hashes;
}
private RFuture<Map<HashKey, HashValue>> disableLocalCacheAsync(String requestId) {
if (localCaches.isEmpty()) {
return RedissonPromise.newSucceededFuture(Collections.emptyMap());
}
RPromise<Map<HashKey, HashValue>> result = new RedissonPromise<Map<HashKey, HashValue>>();
Map<HashKey, HashValue> hashes = new HashMap<HashKey, HashValue>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) {
MapOperation mapOperation = (MapOperation) transactionalOperation;
RedissonLocalCachedMap<?, ?> map = (RedissonLocalCachedMap<?, ?>)mapOperation.getMap();
HashKey hashKey = new HashKey(transactionalOperation.getName(), transactionalOperation.getCodec());
byte[] key = map.toCacheKey(mapOperation.getKey()).getKeyHash();
HashValue value = hashes.get(hashKey);
if (value == null) {
value = new HashValue();
hashes.put(hashKey, value);
}
value.getKeyIds().add(key);
String disabledKeysName = RedissonObject.suffixName(transactionalOperation.getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = batch.getListMultimapCache(disabledKeysName, transactionalOperation.getCodec());
LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout());
multimap.putAsync(localCacheKey, ByteBufUtil.hexDump(key));
multimap.expireKeyAsync(localCacheKey, options.getResponseTimeout(), TimeUnit.MILLISECONDS);
}
}
RFuture<BatchResult<?>> batchListener = batch.executeAsync();
batchListener.addListener(new FutureListener<BatchResult<?>>() {
@Override
public void operationComplete(Future<BatchResult<?>> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
CountableListener<Map<HashKey, HashValue>> listener =
new CountableListener<Map<HashKey, HashValue>>(result, hashes);
listener.setCounter(hashes.size());
RPromise<Void> subscriptionFuture = new RedissonPromise<Void>();
CountableListener<Void> subscribedFutures = new CountableListener<Void>(subscriptionFuture, null);
subscribedFutures.setCounter(hashes.size());
List<RTopic<Object>> topics = new ArrayList<RTopic<Object>>();
for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {
final String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX);
RTopic<Object> topic = new RedissonTopic<Object>(LocalCachedMessageCodec.INSTANCE,
commandExecutor, disabledAckName);
topics.add(topic);
RFuture<Integer> topicFuture = topic.addListenerAsync(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
AtomicInteger counter = entry.getValue().getCounter();
if (counter.decrementAndGet() == 0) {
listener.decCounter();
}
}
});
topicFuture.addListener(new FutureListener<Integer>() {
@Override
public void operationComplete(Future<Integer> future) throws Exception {
subscribedFutures.decCounter();
}
});
}
subscriptionFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (final Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec());
LocalCachedMapDisabledKey localCacheKey = new LocalCachedMapDisabledKey(requestId, options.getResponseTimeout());
multimap.removeAllAsync(localCacheKey);
RTopicAsync<Object> topic = publishBatch.getTopic(RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX), LocalCachedMessageCodec.INSTANCE);
RFuture<Long> publishFuture = topic.publishAsync(new LocalCachedMapDisable(requestId,
entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]), options.getResponseTimeout()));
publishFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
int receivers = future.getNow().intValue();
AtomicInteger counter = entry.getValue().getCounter();
if (counter.addAndGet(receivers) == 0) {
listener.decCounter();
}
}
});
}
RFuture<BatchResult<?>> publishFuture = publishBatch.executeAsync();
publishFuture.addListener(new FutureListener<BatchResult<?>>() {
@Override
public void operationComplete(Future<BatchResult<?>> future) throws Exception {
result.addListener(new FutureListener<Map<HashKey, HashValue>>() {
@Override
public void operationComplete(Future<Map<HashKey, HashValue>> future)
throws Exception {
for (RTopic<Object> topic : topics) {
topic.removeAllListeners();
}
}
});
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
result.tryFailure(new TransactionTimeoutException("Unable to execute transaction within " + options.getResponseTimeout() + "ms"));
}
}, options.getResponseTimeout(), TimeUnit.MILLISECONDS);
}
});
}
});
}
});
return result;
}
protected static String generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
@ -351,12 +572,39 @@ public class RedissonTransaction implements RTransaction {
try {
executorService.execute(BatchOptions.defaults());
} catch (Exception e) {
throw new TransactionException("Unable to execute transaction", e);
throw new TransactionException("Unable to rollback transaction", e);
}
operations.clear();
executed.set(true);
}
@Override
public RFuture<Void> rollbackAsync() {
checkState();
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.rollback(executorService);
}
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Object> future = executorService.executeAsync(BatchOptions.defaults());
future.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(new TransactionException("Unable to rollback transaction", future.cause()));
return;
}
operations.clear();
executed.set(true);
result.trySuccess(null);
}
});
return result;
}
protected void checkState() {
if (executed.get()) {

@ -155,8 +155,6 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
result.trySuccess(future.getNow());
}
});
result.trySuccess(null);
}
});
return result;
@ -193,8 +191,6 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
result.trySuccess(future.getNow());
}
});
result.trySuccess(null);
}
});
return result;
@ -231,8 +227,6 @@ public class RedissonTransactionalBucket<V> extends RedissonBucket<V> {
result.trySuccess(future.getNow());
}
});
result.trySuccess(null);
}
});
return result;

Loading…
Cancel
Save