Improvement - RBuckets.get() should group keys by slot in Redis Cluster mode. #3813

pull/3826/head
Nikita Koksharov 4 years ago
parent cb828969a9
commit 4ca0fae6a5

@ -15,15 +15,6 @@
*/
package org.redisson;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.redisson.api.RBuckets;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
@ -36,6 +27,11 @@ import org.redisson.connection.decoder.BucketsDecoder;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.misc.RedissonPromise;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @author Nikita Koksharov
@ -45,7 +41,7 @@ public class RedissonBuckets implements RBuckets {
protected final Codec codec;
protected final CommandAsyncExecutor commandExecutor;
public RedissonBuckets(CommandAsyncExecutor commandExecutor) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor);
}
@ -83,7 +79,7 @@ public class RedissonBuckets implements RBuckets {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("MGET", new MapGetAllDecoder(Arrays.<Object>asList(keys), 0));
return commandExecutor.readBatchedAsync(commandCodec, command, new SlotCallback<Map<Object, Object>, Map<String, V>>() {
Map<String, V> results = new ConcurrentHashMap<>();
final Map<String, V> results = new ConcurrentHashMap<>();
@Override
public void onSlotResult(Map<Object, Object> result) {
@ -100,8 +96,8 @@ public class RedissonBuckets implements RBuckets {
}
@Override
public RedisCommand<Map<Object, Object>> createCommand(Object param) {
return new RedisCommand<Map<Object, Object>>("MGET", new BucketsDecoder(param.toString()));
public RedisCommand<Map<Object, Object>> createCommand(List<String> keys) {
return new RedisCommand<>("MGET", new BucketsDecoder(keys));
}
}, keys);
}

@ -17,6 +17,8 @@ package org.redisson;
import org.redisson.client.protocol.RedisCommand;
import java.util.List;
/**
*
* @author Nikita Koksharov
@ -26,7 +28,7 @@ import org.redisson.client.protocol.RedisCommand;
*/
public interface SlotCallback<T, R> {
default RedisCommand<T> createCommand(Object param) {
default RedisCommand<T> createCommand(List<String> param) {
return null;
}

@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
*
@ -603,16 +604,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return writeAsync((String) null, codec, command, keys);
}
Map<MasterSlaveEntry, List<String>> range2key = new HashMap<>();
for (String key : keys) {
int slot = connectionManager.calcSlot(key);
MasterSlaveEntry entry = connectionManager.getEntry(slot);
List<String> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
}
Map<MasterSlaveEntry, Map<Integer, List<String>>> entry2keys = Arrays.stream(keys).collect(
Collectors.groupingBy(k -> {
int slot = connectionManager.calcSlot(k);
return connectionManager.getEntry(slot);
}, Collectors.groupingBy(k -> {
return connectionManager.calcSlot(k);
}, Collectors.toList())));
long total = entry2keys.values().stream().mapToInt(m -> m.size()).sum();
RPromise<R> result = new RedissonPromise<>();
AtomicLong executed = new AtomicLong(keys.length);
AtomicLong executed = new AtomicLong(total);
AtomicReference<Throwable> failed = new AtomicReference<>();
BiConsumer<T, Throwable> listener = (res, ex) -> {
if (ex != null) {
@ -632,7 +635,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
};
for (Entry<MasterSlaveEntry, List<String>> entry : range2key.entrySet()) {
for (Entry<MasterSlaveEntry, Map<Integer, List<String>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService;
if (this instanceof CommandBatchService) {
@ -641,17 +644,17 @@ public class CommandAsyncService implements CommandAsyncExecutor {
executorService = new CommandBatchService(this);
}
for (String key : entry.getValue()) {
for (List<String> groupedKeys : entry.getValue().values()) {
RedisCommand<T> c = command;
RedisCommand<T> newCommand = callback.createCommand(key);
RedisCommand<T> newCommand = callback.createCommand(groupedKeys);
if (newCommand != null) {
c = newCommand;
}
if (readOnly) {
RFuture<T> f = executorService.readAsync(entry.getKey(), codec, c, key);
RFuture<T> f = executorService.readAsync(entry.getKey(), codec, c, groupedKeys.toArray());
f.onComplete(listener);
} else {
RFuture<T> f = executorService.writeAsync(entry.getKey(), codec, c, key);
RFuture<T> f = executorService.writeAsync(entry.getKey(), codec, c, groupedKeys.toArray());
f.onComplete(listener);
}
}

@ -18,7 +18,6 @@ package org.redisson.connection.decoder;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -30,19 +29,19 @@ import java.util.Map;
*/
public class BucketsDecoder implements MultiDecoder<Map<Object, Object>> {
private final String key;
private final List<String> keys;
public BucketsDecoder(String key) {
this.key = key;
public BucketsDecoder(List<String> keys) {
this.keys = keys;
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return new HashMap<Object, Object>();
Map<Object, Object> result = new HashMap<>();
for (int i = 0; i < parts.size(); i++) {
result.put(keys.get(i), parts.get(i));
}
return Collections.singletonMap(key, parts.get(0));
return result;
}
}

@ -43,10 +43,14 @@ public class RedissonBucketsTest extends BaseTest {
int size = 10000;
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < size; i++) {
for (int i = 0; i < 10; i++) {
map.put("test" + i, i);
redisson.getBucket("test" + i).set(i);
}
for (int i = 10; i < size; i++) {
map.put("test" + i + "{" + (i%100)+ "}", i);
redisson.getBucket("test" + i + "{" + (i%100)+ "}").set(i);
}
Set<String> queryKeys = new HashSet<>(map.keySet());
queryKeys.add("test_invalid");

Loading…
Cancel
Save