Merge branch 'redisson:master' into master

pull/5294/head
zzhlhc 1 year ago committed by GitHub
commit 503a63c740
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,14 +17,14 @@
<dependency>
<groupId>org.hibernate.orm</groupId>
<artifactId>hibernate-core</artifactId>
<version>6.2.5.Final</version>
<version>6.3.0.Final</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.hibernate.orm</groupId>
<artifactId>hibernate-testing</artifactId>
<version>6.2.5.Final</version>
<version>6.3.0.Final</version>
<scope>test</scope>
</dependency>

@ -102,10 +102,10 @@ public class RedissonBuckets implements RBuckets {
}
@Override
public RedisCommand<Map<Object, Object>> createCommand(List<String> keys) {
public RedisCommand<Map<Object, Object>> createCommand(List<Object> keys) {
return new RedisCommand<>("MGET", new BucketsDecoder(keys));
}
}, keysList.toArray(new String[0]));
}, keysList.toArray(new Object[0]));
}
@Override
@ -132,9 +132,9 @@ public class RedissonBuckets implements RBuckets {
}
@Override
public Object[] createParams(List<String> keys) {
public Object[] createParams(List<Object> keys) {
List<Object> params = new ArrayList<>(keys.size());
for (String key : keys) {
for (Object key : keys) {
params.add(key);
try {
params.add(codec.getValueEncoder().encode(mappedBuckets.get(key)));
@ -144,7 +144,7 @@ public class RedissonBuckets implements RBuckets {
}
return params.toArray();
}
}, mappedBuckets.keySet().toArray(new String[]{}));
}, mappedBuckets.keySet().toArray(new Object[0]));
}
private Map<String, ?> map(Map<String, ?> buckets) {
@ -172,9 +172,9 @@ public class RedissonBuckets implements RBuckets {
}
@Override
public Object[] createParams(List<String> keys) {
public Object[] createParams(List<Object> keys) {
List<Object> params = new ArrayList<>(keys.size());
for (String key : keys) {
for (Object key : keys) {
params.add(key);
try {
params.add(codec.getValueEncoder().encode(mappedBuckets.get(key)));
@ -184,7 +184,7 @@ public class RedissonBuckets implements RBuckets {
}
return params.toArray();
}
}, mappedBuckets.keySet().toArray(new String[]{}));
}, mappedBuckets.keySet().toArray(new Object[0]));
}
}

@ -28,11 +28,11 @@ import java.util.List;
*/
public interface SlotCallback<T, R> {
default RedisCommand<T> createCommand(List<String> params) {
default RedisCommand<T> createCommand(List<Object> params) {
return null;
}
default Object[] createParams(List<String> params) {
default Object[] createParams(List<Object> params) {
return params.toArray();
}

@ -15,6 +15,7 @@
*/
package org.redisson.cluster;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
@ -791,7 +792,26 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
int result = CRC16.crc16(key) % MAX_SLOT;
return result;
}
@Override
public int calcSlot(ByteBuf key) {
if (key == null) {
return 0;
}
int start = key.indexOf(key.readerIndex(), key.readerIndex() + key.readableBytes(), (byte) '{');
if (start != -1) {
int end = key.indexOf(start + 1, key.readerIndex() + key.readableBytes(), (byte) '}');
if (end != -1 && start + 1 < end) {
key = key.slice(start + 1, end-start - 1);
}
}
int result = CRC16.crc16(key) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}
@Override
public int calcSlot(String key) {
if (key == null) {

@ -66,13 +66,15 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params);
<R> List<CompletableFuture<R>> executeAllAsync(MasterSlaveEntry entry, RedisCommand<?> command, Object... params);
@ -90,15 +92,21 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalReadAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteAsync(ByteBuf key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteNoRetryAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params);
@ -110,13 +118,13 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec,
RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry);
RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry);
<V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<?> command, long secondsTimeout, String... queueNames);
@ -126,9 +134,11 @@ public interface CommandAsyncExecutor {
ByteBuf encodeMapValue(Codec codec, Object value);
<T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys);
<T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... keys);
<T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... keys);
<T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys);
<T, R> RFuture<R> evalWriteBatchedAsync(Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback);
boolean isEvalShaROSupported();

@ -308,7 +308,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
int slot = connectionManager.calcSlot(key);
return new NodeSource(slot);
}
private NodeSource getNodeSource(ByteBuf key) {
int slot = connectionManager.calcSlot(key);
return new NodeSource(slot);
}
@Override
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
NodeSource source = getNodeSource(key);
@ -321,6 +326,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return async(true, source, codec, command, params, false, false);
}
@Override
public <T, R> RFuture<R> readAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object... params) {
NodeSource source = getNodeSource(key);
return async(true, source, codec, command, params, false, false);
}
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
return async(true, new NodeSource(entry), codec, command, params, false, false);
}
@ -358,12 +369,23 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, false, params);
}
@Override
public <T, R> RFuture<R> evalReadAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(entry, client), true, codec, evalCommandType, script, keys, false, params);
}
@Override
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
}
@Override
public <T, R> RFuture<R> evalWriteAsync(ByteBuf key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
}
@Override
public <T, R> RFuture<R> evalWriteNoRetryAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = getNodeSource(key);
@ -504,6 +526,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return async(false, source, codec, command, params, false, false);
}
@Override
public <T, R> RFuture<R> writeAsync(ByteBuf key, Codec codec, RedisCommand<T> command, Object... params) {
NodeSource source = getNodeSource(key);
return async(false, source, codec, command, params, false, false);
}
private final AtomicBoolean sortRoSupported = new AtomicBoolean(true);
public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec,
@ -543,16 +571,95 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys) {
public <T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... keys) {
return executeBatchedAsync(true, codec, command, callback, keys);
}
@Override
public <T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys) {
public <T, R> RFuture<R> writeBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... keys) {
return executeBatchedAsync(false, codec, command, callback, keys);
}
private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String[] keys) {
@Override
public <T, R> RFuture<R> evalWriteBatchedAsync(Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback) {
return evalWriteBatchedAsync(false, codec, command, script, keys, callback);
}
private <T, R> RFuture<R> evalWriteBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback) {
if (!connectionManager.isClusterMode()) {
Object[] keysArray = callback.createParams(keys);
Object[] paramsArray = callback.createParams(null);
if (readOnly) {
return evalReadAsync((String) null, codec, command, script, Arrays.asList(keysArray), paramsArray);
}
return evalWriteAsync((String) null, codec, command, script, Arrays.asList(keysArray), paramsArray);
}
Map<MasterSlaveEntry, Map<Integer, List<Object>>> entry2keys = keys.stream().collect(
Collectors.groupingBy(k -> {
int slot;
if (k instanceof String) {
slot = connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
slot = connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
return connectionManager.getWriteEntry(slot);
}, Collectors.groupingBy(k -> {
if (k instanceof String) {
return connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
return connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
}, Collectors.toList())));
List<CompletableFuture<?>> futures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService;
if (this instanceof CommandBatchService) {
executorService = (CommandBatchService) this;
} else {
executorService = new CommandBatchService(this);
}
for (List<Object> groupedKeys : entry.getValue().values()) {
RedisCommand<T> c = command;
RedisCommand<T> newCommand = callback.createCommand(groupedKeys);
if (newCommand != null) {
c = newCommand;
}
Object[] keysArray = callback.createParams(groupedKeys);
Object[] paramsArray = callback.createParams(null);
if (readOnly) {
RFuture<T> f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.add(f.toCompletableFuture());
} else {
RFuture<T> f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.add(f.toCompletableFuture());
}
}
if (!(this instanceof CommandBatchService)) {
executorService.executeAsync();
}
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> result = future.thenApply(r -> {
futures.forEach(f -> {
callback.onSlotResult((T) f.join());
});
return callback.onFinish();
});
return new CompletableFutureWrapper<>(result);
}
private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object[] keys) {
if (!connectionManager.isClusterMode()) {
Object[] params = callback.createParams(Arrays.asList(keys));
if (readOnly) {
@ -561,16 +668,29 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return writeAsync((String) null, codec, command, params);
}
Map<MasterSlaveEntry, Map<Integer, List<String>>> entry2keys = Arrays.stream(keys).collect(
Map<MasterSlaveEntry, Map<Integer, List<Object>>> entry2keys = Arrays.stream(keys).collect(
Collectors.groupingBy(k -> {
int slot = connectionManager.calcSlot(k);
int slot;
if (k instanceof String) {
slot = connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
slot = connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
return connectionManager.getWriteEntry(slot);
}, Collectors.groupingBy(k -> {
return connectionManager.calcSlot(k);
}, Collectors.toList())));
if (k instanceof String) {
return connectionManager.calcSlot((String) k);
} else if (k instanceof ByteBuf) {
return connectionManager.calcSlot((ByteBuf) k);
} else {
throw new IllegalArgumentException();
}
}, Collectors.toList())));
List<CompletableFuture<?>> futures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<String>>> entry : entry2keys.entrySet()) {
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService;
if (this instanceof CommandBatchService) {
@ -579,7 +699,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
executorService = new CommandBatchService(this);
}
for (List<String> groupedKeys : entry.getValue().values()) {
for (List<Object> groupedKeys : entry.getValue().values()) {
RedisCommand<T> c = command;
RedisCommand<T> newCommand = callback.createCommand(groupedKeys);
if (newCommand != null) {

@ -15,6 +15,8 @@
*/
package org.redisson.connection;
import io.netty.buffer.ByteBuf;
/**
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
**/
@ -55,4 +57,14 @@ public final class CRC16 {
return crc & 0xFFFF;
}
public static int crc16(ByteBuf bytes) {
int crc = 0x0000;
for (int i = 0; i < bytes.readableBytes(); i++) {
byte b = bytes.getByte(bytes.readerIndex() + i);
crc = (crc << 8) ^ LOOKUP_TABLE[((crc >>> 8) ^ (b & 0xFF)) & 0xFF];
}
return crc & 0xFFFF;
}
}

@ -15,6 +15,7 @@
*/
package org.redisson.connection;
import io.netty.buffer.ByteBuf;
import org.redisson.api.NodeType;
import org.redisson.client.RedisClient;
import org.redisson.misc.RedisURI;
@ -40,7 +41,9 @@ public interface ConnectionManager {
boolean isClusterMode();
int calcSlot(String key);
int calcSlot(ByteBuf key);
int calcSlot(byte[] key);
Collection<MasterSlaveEntry> getEntrySet();

@ -15,6 +15,7 @@
*/
package org.redisson.connection;
import io.netty.buffer.ByteBuf;
import org.redisson.api.NodeType;
import org.redisson.client.*;
import org.redisson.cluster.ClusterSlotRange;
@ -400,6 +401,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return singleSlotRange.getStartSlot();
}
@Override
public int calcSlot(ByteBuf key) {
return singleSlotRange.getStartSlot();
}
@Override
public MasterSlaveEntry getEntry(InetSocketAddress address) {
lazyConnect();

@ -29,9 +29,9 @@ import java.util.Map;
*/
public class BucketsDecoder implements MultiDecoder<Map<Object, Object>> {
private final List<String> keys;
private final List<Object> keys;
public BucketsDecoder(List<String> keys) {
public BucketsDecoder(List<Object> keys) {
this.keys = keys;
}

Loading…
Cancel
Save