From adf8990356c86d1aa066d1ad125eb05f0037037b Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 12 Oct 2020 09:36:31 +0300 Subject: [PATCH] Fixed - RTransaction should be executed in IN_MEMORY_ATOMIC mode #3119 --- .../transaction/RedissonTransaction.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index 15ea0a15f..9a2314518 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -304,7 +304,7 @@ public class RedissonTransaction implements RTransaction { return RedissonPromise.newSucceededFuture(null); } - RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); + RedissonBatch publishBatch = createBatch(); for (Entry entry : hashes.entrySet()) { String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX); RTopicAsync topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE); @@ -320,7 +320,7 @@ public class RedissonTransaction implements RTransaction { return; } - RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); + RedissonBatch publishBatch = createBatch(); for (Entry entry : hashes.entrySet()) { String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX); RTopicAsync topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE); @@ -341,7 +341,7 @@ public class RedissonTransaction implements RTransaction { } Map hashes = new HashMap<>(localCaches.size()); - RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); + RedissonBatch batch = createBatch(); for (TransactionalOperation transactionalOperation : operations) { if (localCaches.contains(transactionalOperation.getName())) { MapOperation mapOperation = (MapOperation) transactionalOperation; @@ -387,7 +387,7 @@ public class RedissonTransaction implements RTransaction { }); } - RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); + RedissonBatch publishBatch = createBatch(); for (Entry entry : hashes.entrySet()) { String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); RMultimapCacheAsync multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec()); @@ -435,7 +435,7 @@ public class RedissonTransaction implements RTransaction { RPromise> result = new RedissonPromise<>(); Map hashes = new HashMap<>(localCaches.size()); - RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); + RedissonBatch batch = createBatch(); for (TransactionalOperation transactionalOperation : operations) { if (localCaches.contains(transactionalOperation.getName())) { MapOperation mapOperation = (MapOperation) transactionalOperation; @@ -491,7 +491,7 @@ public class RedissonTransaction implements RTransaction { } subscriptionFuture.onComplete((r, ex) -> { - RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); + RedissonBatch publishBatch = createBatch(); for (Entry entry : hashes.entrySet()) { String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); RMultimapCacheAsync multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec()); @@ -538,7 +538,12 @@ public class RedissonTransaction implements RTransaction { return result; } - + + private RedissonBatch createBatch() { + return new RedissonBatch(null, commandExecutor.getConnectionManager(), + BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.IN_MEMORY_ATOMIC)); + } + protected static String generateId() { byte[] id = new byte[16]; ThreadLocalRandom.current().nextBytes(id);