From 6d525bf30e53b5379054fdb35ddff71715c45ee0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 23 Aug 2022 14:14:09 +0300 Subject: [PATCH] Fixed - RBuckets methods don't use nameMapper #4444 --- .../java/org/redisson/RedissonBuckets.java | 32 +++++++--- .../org/redisson/RedissonBucketsTest.java | 58 +++++++++++++++++++ 2 files changed, 82 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBuckets.java b/redisson/src/main/java/org/redisson/RedissonBuckets.java index 2d4494abe..224115084 100644 --- a/redisson/src/main/java/org/redisson/RedissonBuckets.java +++ b/redisson/src/main/java/org/redisson/RedissonBuckets.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; /** * @@ -74,10 +75,14 @@ public class RedissonBuckets implements RBuckets { if (keys.length == 0) { return new CompletableFutureWrapper<>(Collections.emptyMap()); } - + + List keysList = Arrays.stream(keys) + .map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k)) + .collect(Collectors.toList()); + Codec commandCodec = new CompositeCodec(StringCodec.INSTANCE, codec, codec); - RedisCommand> command = new RedisCommand>("MGET", new MapGetAllDecoder(Arrays.asList(keys), 0)); + RedisCommand> command = new RedisCommand>("MGET", new MapGetAllDecoder(keysList, 0)); return commandExecutor.readBatchedAsync(commandCodec, command, new SlotCallback, Map>() { final Map results = new ConcurrentHashMap<>(); @@ -85,7 +90,8 @@ public class RedissonBuckets implements RBuckets { public void onSlotResult(Map result) { for (Map.Entry entry : result.entrySet()) { if (entry.getKey() != null && entry.getValue() != null) { - results.put((String) entry.getKey(), (V) entry.getValue()); + String key = commandExecutor.getConnectionManager().getConfig().getNameMapper().unmap((String) entry.getKey()); + results.put(key, (V) entry.getValue()); } } } @@ -99,7 +105,7 @@ public class RedissonBuckets implements RBuckets { public RedisCommand> createCommand(List keys) { return new RedisCommand<>("MGET", new BucketsDecoder(keys)); } - }, keys); + }, keysList.toArray(new String[0])); } @Override @@ -108,6 +114,8 @@ public class RedissonBuckets implements RBuckets { return new CompletableFutureWrapper<>(false); } + Map mappedBuckets = map(buckets); + return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSETNX, new SlotCallback() { final AtomicBoolean result = new AtomicBoolean(true); @@ -129,14 +137,20 @@ public class RedissonBuckets implements RBuckets { for (String key : keys) { params.add(key); try { - params.add(codec.getValueEncoder().encode(buckets.get(key))); + params.add(codec.getValueEncoder().encode(mappedBuckets.get(key))); } catch (IOException e) { throw new IllegalArgumentException(e); } } return params.toArray(); } - }, buckets.keySet().toArray(new String[]{})); + }, mappedBuckets.keySet().toArray(new String[]{})); + } + + private Map map(Map buckets) { + return buckets.entrySet().stream().collect( + Collectors.toMap(e -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(e.getKey()), + e -> e.getValue())); } @Override @@ -145,6 +159,8 @@ public class RedissonBuckets implements RBuckets { return new CompletableFutureWrapper<>((Void) null); } + Map mappedBuckets = map(buckets); + return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new SlotCallback() { @Override public void onSlotResult(Void result) { @@ -161,14 +177,14 @@ public class RedissonBuckets implements RBuckets { for (String key : keys) { params.add(key); try { - params.add(codec.getValueEncoder().encode(buckets.get(key))); + params.add(codec.getValueEncoder().encode(mappedBuckets.get(key))); } catch (IOException e) { throw new IllegalArgumentException(e); } } return params.toArray(); } - }, buckets.keySet().toArray(new String[]{})); + }, mappedBuckets.keySet().toArray(new String[]{})); } } diff --git a/redisson/src/test/java/org/redisson/RedissonBucketsTest.java b/redisson/src/test/java/org/redisson/RedissonBucketsTest.java index 999cee830..9129f5453 100644 --- a/redisson/src/test/java/org/redisson/RedissonBucketsTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBucketsTest.java @@ -11,6 +11,7 @@ import java.util.Set; import org.junit.jupiter.api.Test; import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.api.NameMapper; import org.redisson.api.RBucket; import org.redisson.api.RBuckets; import org.redisson.api.RedissonClient; @@ -20,6 +21,63 @@ import org.redisson.connection.balancer.RandomLoadBalancer; public class RedissonBucketsTest extends BaseTest { + @Test + public void testGetInClusterNameMapper() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setNameMapper(new NameMapper() { + @Override + public String map(String name) { + return "test::" + name; + } + + @Override + public String unmap(String name) { + return name.replace("test::", ""); + } + }) + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + int size = 10000; + Map map = new HashMap<>(); + for (int i = 0; i < 10; i++) { + map.put("test" + i, i); + } + for (int i = 10; i < size; i++) { + map.put("test" + i + "{" + (i%100)+ "}", i); + } + + redisson.getBuckets().set(map); + + Set queryKeys = new HashSet<>(map.keySet()); + queryKeys.add("test_invalid"); + Map buckets = redisson.getBuckets().get(queryKeys.toArray(new String[map.size()])); + + assertThat(buckets).isEqualTo(map); + + for (int i = 0; i < 10; i++) { + assertThat(redisson.getBucket("test" + i).get()).isEqualTo(i); + } + + redisson.shutdown(); + process.shutdown(); + } + @Test public void testGetInCluster() throws FailedToStartRedisException, IOException, InterruptedException { RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();