Fixed - RBuckets methods don't use nameMapper #4444

pull/4531/head
Nikita Koksharov 2 years ago
parent 7aa2775343
commit 6d525bf30e

@ -31,6 +31,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
*
@ -74,10 +75,14 @@ public class RedissonBuckets implements RBuckets {
if (keys.length == 0) {
return new CompletableFutureWrapper<>(Collections.emptyMap());
}
List<Object> keysList = Arrays.stream(keys)
.map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k))
.collect(Collectors.toList());
Codec commandCodec = new CompositeCodec(StringCodec.INSTANCE, codec, codec);
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("MGET", new MapGetAllDecoder(Arrays.<Object>asList(keys), 0));
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("MGET", new MapGetAllDecoder(keysList, 0));
return commandExecutor.readBatchedAsync(commandCodec, command, new SlotCallback<Map<Object, Object>, Map<String, V>>() {
final Map<String, V> results = new ConcurrentHashMap<>();
@ -85,7 +90,8 @@ public class RedissonBuckets implements RBuckets {
public void onSlotResult(Map<Object, Object> result) {
for (Map.Entry<Object, Object> entry : result.entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
results.put((String) entry.getKey(), (V) entry.getValue());
String key = commandExecutor.getConnectionManager().getConfig().getNameMapper().unmap((String) entry.getKey());
results.put(key, (V) entry.getValue());
}
}
}
@ -99,7 +105,7 @@ public class RedissonBuckets implements RBuckets {
public RedisCommand<Map<Object, Object>> createCommand(List<String> keys) {
return new RedisCommand<>("MGET", new BucketsDecoder(keys));
}
}, keys);
}, keysList.toArray(new String[0]));
}
@Override
@ -108,6 +114,8 @@ public class RedissonBuckets implements RBuckets {
return new CompletableFutureWrapper<>(false);
}
Map<String, ?> mappedBuckets = map(buckets);
return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSETNX, new SlotCallback<Boolean, Boolean>() {
final AtomicBoolean result = new AtomicBoolean(true);
@ -129,14 +137,20 @@ public class RedissonBuckets implements RBuckets {
for (String key : keys) {
params.add(key);
try {
params.add(codec.getValueEncoder().encode(buckets.get(key)));
params.add(codec.getValueEncoder().encode(mappedBuckets.get(key)));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
return params.toArray();
}
}, buckets.keySet().toArray(new String[]{}));
}, mappedBuckets.keySet().toArray(new String[]{}));
}
private Map<String, ?> map(Map<String, ?> buckets) {
return buckets.entrySet().stream().collect(
Collectors.toMap(e -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(e.getKey()),
e -> e.getValue()));
}
@Override
@ -145,6 +159,8 @@ public class RedissonBuckets implements RBuckets {
return new CompletableFutureWrapper<>((Void) null);
}
Map<String, ?> mappedBuckets = map(buckets);
return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new SlotCallback<Void, Void>() {
@Override
public void onSlotResult(Void result) {
@ -161,14 +177,14 @@ public class RedissonBuckets implements RBuckets {
for (String key : keys) {
params.add(key);
try {
params.add(codec.getValueEncoder().encode(buckets.get(key)));
params.add(codec.getValueEncoder().encode(mappedBuckets.get(key)));
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
return params.toArray();
}
}, buckets.keySet().toArray(new String[]{}));
}, mappedBuckets.keySet().toArray(new String[]{}));
}
}

@ -11,6 +11,7 @@ import java.util.Set;
import org.junit.jupiter.api.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.NameMapper;
import org.redisson.api.RBucket;
import org.redisson.api.RBuckets;
import org.redisson.api.RedissonClient;
@ -20,6 +21,63 @@ import org.redisson.connection.balancer.RandomLoadBalancer;
public class RedissonBucketsTest extends BaseTest {
@Test
public void testGetInClusterNameMapper() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setNameMapper(new NameMapper() {
@Override
public String map(String name) {
return "test::" + name;
}
@Override
public String unmap(String name) {
return name.replace("test::", "");
}
})
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
int size = 10000;
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put("test" + i, i);
}
for (int i = 10; i < size; i++) {
map.put("test" + i + "{" + (i%100)+ "}", i);
}
redisson.getBuckets().set(map);
Set<String> queryKeys = new HashSet<>(map.keySet());
queryKeys.add("test_invalid");
Map<String, Integer> buckets = redisson.getBuckets().get(queryKeys.toArray(new String[map.size()]));
assertThat(buckets).isEqualTo(map);
for (int i = 0; i < 10; i++) {
assertThat(redisson.getBucket("test" + i).get()).isEqualTo(i);
}
redisson.shutdown();
process.shutdown();
}
@Test
public void testGetInCluster() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();

Loading…
Cancel
Save