refactoring

pull/5300/head
Nikita Koksharov 2 years ago
parent bfdf77f4ab
commit 365424cb77

@ -102,10 +102,10 @@ public class RedissonBuckets implements RBuckets {
} }
@Override @Override
public RedisCommand<Map<Object, Object>> createCommand(List<String> keys) { public RedisCommand<Map<Object, Object>> createCommand(List<Object> keys) {
return new RedisCommand<>("MGET", new BucketsDecoder(keys)); return new RedisCommand<>("MGET", new BucketsDecoder(keys));
} }
}, keysList.toArray(new String[0])); }, keysList.toArray(new Object[0]));
} }
@Override @Override
@ -132,9 +132,9 @@ public class RedissonBuckets implements RBuckets {
} }
@Override @Override
public Object[] createParams(List<String> keys) { public Object[] createParams(List<Object> keys) {
List<Object> params = new ArrayList<>(keys.size()); List<Object> params = new ArrayList<>(keys.size());
for (String key : keys) { for (Object key : keys) {
params.add(key); params.add(key);
try { try {
params.add(codec.getValueEncoder().encode(mappedBuckets.get(key))); params.add(codec.getValueEncoder().encode(mappedBuckets.get(key)));
@ -144,7 +144,7 @@ public class RedissonBuckets implements RBuckets {
} }
return params.toArray(); return params.toArray();
} }
}, mappedBuckets.keySet().toArray(new String[]{})); }, mappedBuckets.keySet().toArray(new Object[0]));
} }
private Map<String, ?> map(Map<String, ?> buckets) { private Map<String, ?> map(Map<String, ?> buckets) {
@ -172,9 +172,9 @@ public class RedissonBuckets implements RBuckets {
} }
@Override @Override
public Object[] createParams(List<String> keys) { public Object[] createParams(List<Object> keys) {
List<Object> params = new ArrayList<>(keys.size()); List<Object> params = new ArrayList<>(keys.size());
for (String key : keys) { for (Object key : keys) {
params.add(key); params.add(key);
try { try {
params.add(codec.getValueEncoder().encode(mappedBuckets.get(key))); params.add(codec.getValueEncoder().encode(mappedBuckets.get(key)));
@ -184,7 +184,7 @@ public class RedissonBuckets implements RBuckets {
} }
return params.toArray(); return params.toArray();
} }
}, mappedBuckets.keySet().toArray(new String[]{})); }, mappedBuckets.keySet().toArray(new Object[0]));
} }
} }

@ -28,11 +28,11 @@ import java.util.List;
*/ */
public interface SlotCallback<T, R> { public interface SlotCallback<T, R> {
default RedisCommand<T> createCommand(List<String> params) { default RedisCommand<T> createCommand(List<Object> params) {
return null; return null;
} }
default Object[] createParams(List<String> params) { default Object[] createParams(List<Object> params) {
return params.toArray(); return params.toArray();
} }

@ -15,6 +15,7 @@
*/ */
package org.redisson.cluster; package org.redisson.cluster;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.concurrent.ScheduledFuture;
@ -792,6 +793,25 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result; return result;
} }
@Override
public int calcSlot(ByteBuf key) {
if (key == null) {
return 0;
}
int start = key.indexOf(key.readerIndex(), key.readerIndex() + key.readableBytes(), (byte) '{');
if (start != -1) {
int end = key.indexOf(start + 1, key.readerIndex() + key.readableBytes(), (byte) '}');
if (end != -1 && start + 1 < end) {
key = key.slice(start + 1, end-start - 1);
}
}
int result = CRC16.crc16(key) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}
@Override @Override
public int calcSlot(String key) { public int calcSlot(String key) {
if (key == null) { if (key == null) {

@ -67,6 +67,8 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params);
@ -91,14 +93,20 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params); <T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalReadAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params); <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteAsync(ByteBuf key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteNoRetryAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params); <T, R> RFuture<R> evalWriteNoRetryAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params); <T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params);
@ -126,9 +134,11 @@ public interface CommandAsyncExecutor {
ByteBuf encodeMapValue(Codec codec, Object value); ByteBuf encodeMapValue(Codec codec, Object value);
<T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys); <T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... keys);
<T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... keys);
<T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys); <T, R> RFuture<R> evalWriteBatchedAsync(Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback);
boolean isEvalShaROSupported(); boolean isEvalShaROSupported();

@ -309,6 +309,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return new NodeSource(slot); return new NodeSource(slot);
} }
private NodeSource getNodeSource(ByteBuf key) {
int slot = connectionManager.calcSlot(key);
return new NodeSource(slot);
}
@Override @Override
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
NodeSource source = getNodeSource(key); NodeSource source = getNodeSource(key);
@ -321,6 +326,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return async(true, source, codec, command, params, false, false); return async(true, source, codec, command, params, false, false);
} }
@Override
public <T, R> RFuture<R> readAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object... params) {
NodeSource source = getNodeSource(key);
return async(true, source, codec, command, params, false, false);
}
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
return async(true, new NodeSource(entry), codec, command, params, false, false); return async(true, new NodeSource(entry), codec, command, params, false, false);
} }
@ -358,12 +369,23 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, false, params); return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, false, params);
} }
@Override
public <T, R> RFuture<R> evalReadAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(entry, client), true, codec, evalCommandType, script, keys, false, params);
}
@Override @Override
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) { public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = getNodeSource(key); NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, false, params); return evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
} }
@Override
public <T, R> RFuture<R> evalWriteAsync(ByteBuf key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
}
@Override @Override
public <T, R> RFuture<R> evalWriteNoRetryAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) { public <T, R> RFuture<R> evalWriteNoRetryAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = getNodeSource(key); NodeSource source = getNodeSource(key);
@ -504,6 +526,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return async(false, source, codec, command, params, false, false); return async(false, source, codec, command, params, false, false);
} }
@Override
public <T, R> RFuture<R> writeAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object... params) {
NodeSource source = getNodeSource(key);
return async(false, source, codec, command, params, false, false);
}
private final AtomicBoolean sortRoSupported = new AtomicBoolean(true); private final AtomicBoolean sortRoSupported = new AtomicBoolean(true);
public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec,
@ -543,16 +571,95 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
@Override @Override
public <T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys) { public <T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... keys) {
return executeBatchedAsync(true, codec, command, callback, keys); return executeBatchedAsync(true, codec, command, callback, keys);
} }
@Override @Override
public <T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys) { public <T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... keys) {
return executeBatchedAsync(false, codec, command, callback, keys); return executeBatchedAsync(false, codec, command, callback, keys);
} }
private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String[] keys) { @Override
public <T, R> RFuture<R> evalWriteBatchedAsync(Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback) {
return evalWriteBatchedAsync(false, codec, command, script, keys, callback);
}
private <T, R> RFuture<R> evalWriteBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback) {
if (!connectionManager.isClusterMode()) {
Object[] keysArray = callback.createParams(keys);
Object[] paramsArray = callback.createParams(null);
if (readOnly) {
return evalReadAsync((String) null, codec, command, script, Arrays.asList(keysArray), paramsArray);
}
return evalWriteAsync((String) null, codec, command, script, Arrays.asList(keysArray), paramsArray);
}
Map<MasterSlaveEntry, Map<Integer, List<Object>>> entry2keys = keys.stream().collect(
Collectors.groupingBy(k -> {
int slot;
if (k instanceof String) {
slot = connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
slot = connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
return connectionManager.getWriteEntry(slot);
}, Collectors.groupingBy(k -> {
if (k instanceof String) {
return connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
return connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
}, Collectors.toList())));
List<CompletableFuture<?>> futures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService;
if (this instanceof CommandBatchService) {
executorService = (CommandBatchService) this;
} else {
executorService = new CommandBatchService(this);
}
for (List<Object> groupedKeys : entry.getValue().values()) {
RedisCommand<T> c = command;
RedisCommand<T> newCommand = callback.createCommand(groupedKeys);
if (newCommand != null) {
c = newCommand;
}
Object[] keysArray = callback.createParams(groupedKeys);
Object[] paramsArray = callback.createParams(null);
if (readOnly) {
RFuture<T> f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.add(f.toCompletableFuture());
} else {
RFuture<T> f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.add(f.toCompletableFuture());
}
}
if (!(this instanceof CommandBatchService)) {
executorService.executeAsync();
}
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> result = future.thenApply(r -> {
futures.forEach(f -> {
callback.onSlotResult((T) f.join());
});
return callback.onFinish();
});
return new CompletableFutureWrapper<>(result);
}
private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object[] keys) {
if (!connectionManager.isClusterMode()) { if (!connectionManager.isClusterMode()) {
Object[] params = callback.createParams(Arrays.asList(keys)); Object[] params = callback.createParams(Arrays.asList(keys));
if (readOnly) { if (readOnly) {
@ -561,16 +668,29 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return writeAsync((String) null, codec, command, params); return writeAsync((String) null, codec, command, params);
} }
Map<MasterSlaveEntry, Map<Integer, List<String>>> entry2keys = Arrays.stream(keys).collect( Map<MasterSlaveEntry, Map<Integer, List<Object>>> entry2keys = Arrays.stream(keys).collect(
Collectors.groupingBy(k -> { Collectors.groupingBy(k -> {
int slot = connectionManager.calcSlot(k); int slot;
if (k instanceof String) {
slot = connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
slot = connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
return connectionManager.getWriteEntry(slot); return connectionManager.getWriteEntry(slot);
}, Collectors.groupingBy(k -> { }, Collectors.groupingBy(k -> {
return connectionManager.calcSlot(k); if (k instanceof String) {
return connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
return connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
}, Collectors.toList()))); }, Collectors.toList())));
List<CompletableFuture<?>> futures = new ArrayList<>(); List<CompletableFuture<?>> futures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<String>>> entry : entry2keys.entrySet()) { for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error // executes in batch due to CROSSLOT error
CommandBatchService executorService; CommandBatchService executorService;
if (this instanceof CommandBatchService) { if (this instanceof CommandBatchService) {
@ -579,7 +699,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
executorService = new CommandBatchService(this); executorService = new CommandBatchService(this);
} }
for (List<String> groupedKeys : entry.getValue().values()) { for (List<Object> groupedKeys : entry.getValue().values()) {
RedisCommand<T> c = command; RedisCommand<T> c = command;
RedisCommand<T> newCommand = callback.createCommand(groupedKeys); RedisCommand<T> newCommand = callback.createCommand(groupedKeys);
if (newCommand != null) { if (newCommand != null) {

@ -15,6 +15,8 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import io.netty.buffer.ByteBuf;
/** /**
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a> * @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
**/ **/
@ -55,4 +57,14 @@ public final class CRC16 {
return crc & 0xFFFF; return crc & 0xFFFF;
} }
public static int crc16(ByteBuf bytes) {
int crc = 0x0000;
for (int i = 0; i < bytes.readableBytes(); i++) {
byte b = bytes.getByte(bytes.readerIndex() + i);
crc = (crc << 8) ^ LOOKUP_TABLE[((crc >>> 8) ^ (b & 0xFF)) & 0xFF];
}
return crc & 0xFFFF;
}
} }

@ -15,6 +15,7 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import io.netty.buffer.ByteBuf;
import org.redisson.api.NodeType; import org.redisson.api.NodeType;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
@ -41,6 +42,8 @@ public interface ConnectionManager {
int calcSlot(String key); int calcSlot(String key);
int calcSlot(ByteBuf key);
int calcSlot(byte[] key); int calcSlot(byte[] key);
Collection<MasterSlaveEntry> getEntrySet(); Collection<MasterSlaveEntry> getEntrySet();

@ -15,6 +15,7 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import io.netty.buffer.ByteBuf;
import org.redisson.api.NodeType; import org.redisson.api.NodeType;
import org.redisson.client.*; import org.redisson.client.*;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
@ -400,6 +401,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return singleSlotRange.getStartSlot(); return singleSlotRange.getStartSlot();
} }
@Override
public int calcSlot(ByteBuf key) {
return singleSlotRange.getStartSlot();
}
@Override @Override
public MasterSlaveEntry getEntry(InetSocketAddress address) { public MasterSlaveEntry getEntry(InetSocketAddress address) {
lazyConnect(); lazyConnect();

@ -29,9 +29,9 @@ import java.util.Map;
*/ */
public class BucketsDecoder implements MultiDecoder<Map<Object, Object>> { public class BucketsDecoder implements MultiDecoder<Map<Object, Object>> {
private final List<String> keys; private final List<Object> keys;
public BucketsDecoder(List<String> keys) { public BucketsDecoder(List<Object> keys) {
this.keys = keys; this.keys = keys;
} }

Loading…
Cancel
Save