Fixed - RTransaction should be executed in IN_MEMORY_ATOMIC mode #3119

pull/3123/head
Nikita Koksharov 4 years ago
parent 796335b7fc
commit adf8990356

@ -304,7 +304,7 @@ public class RedissonTransaction implements RTransaction {
return RedissonPromise.newSucceededFuture(null);
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
RedissonBatch publishBatch = createBatch();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX);
RTopicAsync topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE);
@ -320,7 +320,7 @@ public class RedissonTransaction implements RTransaction {
return;
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
RedissonBatch publishBatch = createBatch();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX);
RTopicAsync topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE);
@ -341,7 +341,7 @@ public class RedissonTransaction implements RTransaction {
}
Map<HashKey, HashValue> hashes = new HashMap<>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
RedissonBatch batch = createBatch();
for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) {
MapOperation mapOperation = (MapOperation) transactionalOperation;
@ -387,7 +387,7 @@ public class RedissonTransaction implements RTransaction {
});
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
RedissonBatch publishBatch = createBatch();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec());
@ -435,7 +435,7 @@ public class RedissonTransaction implements RTransaction {
RPromise<Map<HashKey, HashValue>> result = new RedissonPromise<>();
Map<HashKey, HashValue> hashes = new HashMap<>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
RedissonBatch batch = createBatch();
for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) {
MapOperation mapOperation = (MapOperation) transactionalOperation;
@ -491,7 +491,7 @@ public class RedissonTransaction implements RTransaction {
}
subscriptionFuture.onComplete((r, ex) -> {
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
RedissonBatch publishBatch = createBatch();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec());
@ -538,7 +538,12 @@ public class RedissonTransaction implements RTransaction {
return result;
}
private RedissonBatch createBatch() {
return new RedissonBatch(null, commandExecutor.getConnectionManager(),
BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.IN_MEMORY_ATOMIC));
}
protected static String generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);

Loading…
Cancel
Save