diff --git a/redisson/src/main/java/org/redisson/RedissonBuckets.java b/redisson/src/main/java/org/redisson/RedissonBuckets.java index daa7f3445..fb44cb681 100644 --- a/redisson/src/main/java/org/redisson/RedissonBuckets.java +++ b/redisson/src/main/java/org/redisson/RedissonBuckets.java @@ -102,10 +102,10 @@ public class RedissonBuckets implements RBuckets { } @Override - public RedisCommand> createCommand(List keys) { + public RedisCommand> createCommand(List keys) { return new RedisCommand<>("MGET", new BucketsDecoder(keys)); } - }, keysList.toArray(new String[0])); + }, keysList.toArray(new Object[0])); } @Override @@ -132,9 +132,9 @@ public class RedissonBuckets implements RBuckets { } @Override - public Object[] createParams(List keys) { + public Object[] createParams(List keys) { List params = new ArrayList<>(keys.size()); - for (String key : keys) { + for (Object key : keys) { params.add(key); try { params.add(codec.getValueEncoder().encode(mappedBuckets.get(key))); @@ -144,7 +144,7 @@ public class RedissonBuckets implements RBuckets { } return params.toArray(); } - }, mappedBuckets.keySet().toArray(new String[]{})); + }, mappedBuckets.keySet().toArray(new Object[0])); } private Map map(Map buckets) { @@ -172,9 +172,9 @@ public class RedissonBuckets implements RBuckets { } @Override - public Object[] createParams(List keys) { + public Object[] createParams(List keys) { List params = new ArrayList<>(keys.size()); - for (String key : keys) { + for (Object key : keys) { params.add(key); try { params.add(codec.getValueEncoder().encode(mappedBuckets.get(key))); @@ -184,7 +184,7 @@ public class RedissonBuckets implements RBuckets { } return params.toArray(); } - }, mappedBuckets.keySet().toArray(new String[]{})); + }, mappedBuckets.keySet().toArray(new Object[0])); } } diff --git a/redisson/src/main/java/org/redisson/SlotCallback.java b/redisson/src/main/java/org/redisson/SlotCallback.java index 153c0ae77..14d6bc822 100644 --- a/redisson/src/main/java/org/redisson/SlotCallback.java +++ b/redisson/src/main/java/org/redisson/SlotCallback.java @@ -28,11 +28,11 @@ import java.util.List; */ public interface SlotCallback { - default RedisCommand createCommand(List params) { + default RedisCommand createCommand(List params) { return null; } - default Object[] createParams(List params) { + default Object[] createParams(List params) { return params.toArray(); } diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 543651fba..45a02214b 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -15,6 +15,7 @@ */ package org.redisson.cluster; +import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; @@ -791,7 +792,26 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { int result = CRC16.crc16(key) % MAX_SLOT; return result; } - + + @Override + public int calcSlot(ByteBuf key) { + if (key == null) { + return 0; + } + + int start = key.indexOf(key.readerIndex(), key.readerIndex() + key.readableBytes(), (byte) '{'); + if (start != -1) { + int end = key.indexOf(start + 1, key.readerIndex() + key.readableBytes(), (byte) '}'); + if (end != -1 && start + 1 < end) { + key = key.slice(start + 1, end-start - 1); + } + } + + int result = CRC16.crc16(key) % MAX_SLOT; + log.debug("slot {} for {}", result, key); + return result; + } + @Override public int calcSlot(String key) { if (key == null) { diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index 46ef09c7a..49d584c8c 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -66,13 +66,15 @@ public interface CommandAsyncExecutor { RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params); RFuture writeAsync(byte[] key, Codec codec, RedisCommand command, Object... params); - + + RFuture writeAsync(ByteBuf key, Codec codec, RedisCommand command, Object... params); + RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params); - + RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object... params); - + RFuture readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand command, Object... params); - + RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object... params); List> executeAllAsync(MasterSlaveEntry entry, RedisCommand command, Object... params); @@ -90,15 +92,21 @@ public interface CommandAsyncExecutor { RFuture evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); RFuture evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); - + + RFuture evalReadAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); + RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); + RFuture evalWriteAsync(ByteBuf key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); + RFuture evalWriteNoRetryAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); RFuture evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); - + RFuture readAsync(byte[] key, Codec codec, RedisCommand command, Object... params); - + + RFuture readAsync(ByteBuf key, Codec codec, RedisCommand command, Object... params); + RFuture readAsync(String key, Codec codec, RedisCommand command, Object... params); RFuture writeAsync(String key, Codec codec, RedisCommand command, Object... params); @@ -110,13 +118,13 @@ public interface CommandAsyncExecutor { RFuture readAsync(String key, RedisCommand command, Object... params); RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params); - + RFuture readRandomAsync(Codec codec, RedisCommand command, Object... params); - + RFuture readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params); RFuture async(boolean readOnlyMode, NodeSource source, Codec codec, - RedisCommand command, Object[] params, boolean ignoreRedirect, boolean noRetry); + RedisCommand command, Object[] params, boolean ignoreRedirect, boolean noRetry); RFuture pollFromAnyAsync(String name, Codec codec, RedisCommand command, long secondsTimeout, String... queueNames); @@ -126,9 +134,11 @@ public interface CommandAsyncExecutor { ByteBuf encodeMapValue(Codec codec, Object value); - RFuture readBatchedAsync(Codec codec, RedisCommand command, SlotCallback callback, String... keys); + RFuture readBatchedAsync(Codec codec, RedisCommand command, SlotCallback callback, Object... keys); + + RFuture writeBatchedAsync(Codec codec, RedisCommand command, SlotCallback callback, Object... keys); - RFuture writeBatchedAsync(Codec codec, RedisCommand command, SlotCallback callback, String... keys); + RFuture evalWriteBatchedAsync(Codec codec, RedisCommand command, String script, List keys, SlotCallback callback); boolean isEvalShaROSupported(); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index c57d8568d..a646c91d8 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -308,7 +308,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { int slot = connectionManager.calcSlot(key); return new NodeSource(slot); } - + + private NodeSource getNodeSource(ByteBuf key) { + int slot = connectionManager.calcSlot(key); + return new NodeSource(slot); + } + @Override public RFuture readAsync(String key, Codec codec, RedisCommand command, Object... params) { NodeSource source = getNodeSource(key); @@ -321,6 +326,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { return async(true, source, codec, command, params, false, false); } + @Override + public RFuture readAsync(ByteBuf key, Codec codec, RedisCommand command, Object... params) { + NodeSource source = getNodeSource(key); + return async(true, source, codec, command, params, false, false); + } + public RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { return async(true, new NodeSource(entry), codec, command, params, false, false); } @@ -358,12 +369,23 @@ public class CommandAsyncService implements CommandAsyncExecutor { return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, false, params); } + @Override + public RFuture evalReadAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { + return evalAsync(new NodeSource(entry, client), true, codec, evalCommandType, script, keys, false, params); + } + @Override public RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { NodeSource source = getNodeSource(key); return evalAsync(source, false, codec, evalCommandType, script, keys, false, params); } + @Override + public RFuture evalWriteAsync(ByteBuf key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { + NodeSource source = getNodeSource(key); + return evalAsync(source, false, codec, evalCommandType, script, keys, false, params); + } + @Override public RFuture evalWriteNoRetryAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { NodeSource source = getNodeSource(key); @@ -504,6 +526,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { return async(false, source, codec, command, params, false, false); } + @Override + public RFuture writeAsync(ByteBuf key, Codec codec, RedisCommand command, Object... params) { + NodeSource source = getNodeSource(key); + return async(false, source, codec, command, params, false, false); + } + private final AtomicBoolean sortRoSupported = new AtomicBoolean(true); public RFuture async(boolean readOnlyMode, NodeSource source, Codec codec, @@ -543,16 +571,95 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture readBatchedAsync(Codec codec, RedisCommand command, SlotCallback callback, String... keys) { + public RFuture readBatchedAsync(Codec codec, RedisCommand command, SlotCallback callback, Object... keys) { return executeBatchedAsync(true, codec, command, callback, keys); } @Override - public RFuture writeBatchedAsync(Codec codec, RedisCommand command, SlotCallback callback, String... keys) { + public RFuture writeBatchedAsync(Codec codec, RedisCommand command, SlotCallback callback, Object... keys) { return executeBatchedAsync(false, codec, command, callback, keys); } - - private RFuture executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand command, SlotCallback callback, String[] keys) { + + @Override + public RFuture evalWriteBatchedAsync(Codec codec, RedisCommand command, String script, List keys, SlotCallback callback) { + return evalWriteBatchedAsync(false, codec, command, script, keys, callback); + } + + private RFuture evalWriteBatchedAsync(boolean readOnly, Codec codec, RedisCommand command, String script, List keys, SlotCallback callback) { + if (!connectionManager.isClusterMode()) { + Object[] keysArray = callback.createParams(keys); + Object[] paramsArray = callback.createParams(null); + if (readOnly) { + return evalReadAsync((String) null, codec, command, script, Arrays.asList(keysArray), paramsArray); + } + return evalWriteAsync((String) null, codec, command, script, Arrays.asList(keysArray), paramsArray); + } + + Map>> entry2keys = keys.stream().collect( + Collectors.groupingBy(k -> { + int slot; + if (k instanceof String) { + slot = connectionManager.calcSlot((String) k); + } else if (k instanceof ByteBuf) { + slot = connectionManager.calcSlot((ByteBuf) k); + } else { + throw new IllegalArgumentException(); + } + return connectionManager.getWriteEntry(slot); + }, Collectors.groupingBy(k -> { + if (k instanceof String) { + return connectionManager.calcSlot((String) k); + } else if (k instanceof ByteBuf) { + return connectionManager.calcSlot((ByteBuf) k); + } else { + throw new IllegalArgumentException(); + } + }, Collectors.toList()))); + + List> futures = new ArrayList<>(); + for (Entry>> entry : entry2keys.entrySet()) { + // executes in batch due to CROSSLOT error + CommandBatchService executorService; + if (this instanceof CommandBatchService) { + executorService = (CommandBatchService) this; + } else { + executorService = new CommandBatchService(this); + } + + for (List groupedKeys : entry.getValue().values()) { + RedisCommand c = command; + RedisCommand newCommand = callback.createCommand(groupedKeys); + if (newCommand != null) { + c = newCommand; + } + Object[] keysArray = callback.createParams(groupedKeys); + Object[] paramsArray = callback.createParams(null); + if (readOnly) { + RFuture f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); + futures.add(f.toCompletableFuture()); + } else { + RFuture f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); + futures.add(f.toCompletableFuture()); + } + } + + if (!(this instanceof CommandBatchService)) { + executorService.executeAsync(); + } + } + + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + CompletableFuture result = future.thenApply(r -> { + futures.forEach(f -> { + callback.onSlotResult((T) f.join()); + }); + return callback.onFinish(); + }); + + return new CompletableFutureWrapper<>(result); + } + + private RFuture executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand command, SlotCallback callback, Object[] keys) { if (!connectionManager.isClusterMode()) { Object[] params = callback.createParams(Arrays.asList(keys)); if (readOnly) { @@ -561,16 +668,29 @@ public class CommandAsyncService implements CommandAsyncExecutor { return writeAsync((String) null, codec, command, params); } - Map>> entry2keys = Arrays.stream(keys).collect( + Map>> entry2keys = Arrays.stream(keys).collect( Collectors.groupingBy(k -> { - int slot = connectionManager.calcSlot(k); + int slot; + if (k instanceof String) { + slot = connectionManager.calcSlot((String) k); + } else if (k instanceof ByteBuf) { + slot = connectionManager.calcSlot((ByteBuf) k); + } else { + throw new IllegalArgumentException(); + } return connectionManager.getWriteEntry(slot); }, Collectors.groupingBy(k -> { - return connectionManager.calcSlot(k); - }, Collectors.toList()))); + if (k instanceof String) { + return connectionManager.calcSlot((String) k); + } else if (k instanceof ByteBuf) { + return connectionManager.calcSlot((ByteBuf) k); + } else { + throw new IllegalArgumentException(); + } + }, Collectors.toList()))); List> futures = new ArrayList<>(); - for (Entry>> entry : entry2keys.entrySet()) { + for (Entry>> entry : entry2keys.entrySet()) { // executes in batch due to CROSSLOT error CommandBatchService executorService; if (this instanceof CommandBatchService) { @@ -579,7 +699,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { executorService = new CommandBatchService(this); } - for (List groupedKeys : entry.getValue().values()) { + for (List groupedKeys : entry.getValue().values()) { RedisCommand c = command; RedisCommand newCommand = callback.createCommand(groupedKeys); if (newCommand != null) { diff --git a/redisson/src/main/java/org/redisson/connection/CRC16.java b/redisson/src/main/java/org/redisson/connection/CRC16.java index 2ac5f55b8..ec30d49c5 100644 --- a/redisson/src/main/java/org/redisson/connection/CRC16.java +++ b/redisson/src/main/java/org/redisson/connection/CRC16.java @@ -15,6 +15,8 @@ */ package org.redisson.connection; +import io.netty.buffer.ByteBuf; + /** * @author Mark Paluch **/ @@ -55,4 +57,14 @@ public final class CRC16 { return crc & 0xFFFF; } + public static int crc16(ByteBuf bytes) { + int crc = 0x0000; + + for (int i = 0; i < bytes.readableBytes(); i++) { + byte b = bytes.getByte(bytes.readerIndex() + i); + crc = (crc << 8) ^ LOOKUP_TABLE[((crc >>> 8) ^ (b & 0xFF)) & 0xFF]; + } + return crc & 0xFFFF; + } + } diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index a3579b742..5b83e7d2f 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -15,6 +15,7 @@ */ package org.redisson.connection; +import io.netty.buffer.ByteBuf; import org.redisson.api.NodeType; import org.redisson.client.RedisClient; import org.redisson.misc.RedisURI; @@ -40,7 +41,9 @@ public interface ConnectionManager { boolean isClusterMode(); int calcSlot(String key); - + + int calcSlot(ByteBuf key); + int calcSlot(byte[] key); Collection getEntrySet(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index bf7970c28..d3250a408 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -15,6 +15,7 @@ */ package org.redisson.connection; +import io.netty.buffer.ByteBuf; import org.redisson.api.NodeType; import org.redisson.client.*; import org.redisson.cluster.ClusterSlotRange; @@ -400,6 +401,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return singleSlotRange.getStartSlot(); } + @Override + public int calcSlot(ByteBuf key) { + return singleSlotRange.getStartSlot(); + } + @Override public MasterSlaveEntry getEntry(InetSocketAddress address) { lazyConnect(); diff --git a/redisson/src/main/java/org/redisson/connection/decoder/BucketsDecoder.java b/redisson/src/main/java/org/redisson/connection/decoder/BucketsDecoder.java index be4ddd5dc..32a7ded84 100644 --- a/redisson/src/main/java/org/redisson/connection/decoder/BucketsDecoder.java +++ b/redisson/src/main/java/org/redisson/connection/decoder/BucketsDecoder.java @@ -29,9 +29,9 @@ import java.util.Map; */ public class BucketsDecoder implements MultiDecoder> { - private final List keys; + private final List keys; - public BucketsDecoder(List keys) { + public BucketsDecoder(List keys) { this.keys = keys; }