|
|
|
@ -22,8 +22,15 @@ import org.redisson.api.RType;
|
|
|
|
|
import org.redisson.client.RedisClient;
|
|
|
|
|
import org.redisson.client.RedisException;
|
|
|
|
|
import org.redisson.client.codec.StringCodec;
|
|
|
|
|
import org.redisson.client.handler.State;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommand;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.redisson.client.protocol.RedisStrictCommand;
|
|
|
|
|
import org.redisson.client.protocol.convertor.Convertor;
|
|
|
|
|
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
|
|
|
|
|
import org.redisson.client.protocol.decoder.ListScanResult;
|
|
|
|
|
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
|
|
|
|
|
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
|
|
|
|
|
import org.redisson.command.CommandAsyncExecutor;
|
|
|
|
|
import org.redisson.command.CommandBatchService;
|
|
|
|
|
import org.redisson.connection.ConnectionManager;
|
|
|
|
@ -71,7 +78,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<RType> getTypeAsync(String key) {
|
|
|
|
|
return commandExecutor.readAsync(key, RedisCommands.TYPE, key);
|
|
|
|
|
return commandExecutor.readAsync(map(key), RedisCommands.TYPE, map(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -81,7 +88,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> getSlotAsync(String key) {
|
|
|
|
|
return commandExecutor.readAsync(null, RedisCommands.KEYSLOT, key);
|
|
|
|
|
return commandExecutor.readAsync(null, RedisCommands.KEYSLOT, map(key));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -89,9 +96,17 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
return getKeysByPattern(pattern, 10);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private final RedisCommand<ListScanResult<String>> scan = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder2(
|
|
|
|
|
new ListScanResultReplayDecoder() {
|
|
|
|
|
@Override
|
|
|
|
|
public ListScanResult<Object> decode(List<Object> parts, State state) {
|
|
|
|
|
return new ListScanResult<>((Long) parts.get(0), (List<Object>) (Object) unmap((List<String>) parts.get(1)));
|
|
|
|
|
}
|
|
|
|
|
}, new ObjectListReplayDecoder<String>()));
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Iterable<String> getKeysByPattern(String pattern, int count) {
|
|
|
|
|
return getKeysByPattern(RedisCommands.SCAN, pattern, 0, count);
|
|
|
|
|
return getKeysByPattern(scan, pattern, 0, count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public <T> Iterable<T> getKeysByPattern(RedisCommand<?> command, String pattern, int limit, int count) {
|
|
|
|
@ -115,7 +130,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Iterable<String> getKeysWithLimit(String pattern, int limit) {
|
|
|
|
|
return getKeysByPattern(RedisCommands.SCAN, pattern, limit, limit);
|
|
|
|
|
return getKeysByPattern(scan, pattern, limit, limit);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -128,7 +143,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
return getKeysByPattern(null, count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand<?> command, long startPos,
|
|
|
|
|
private RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand<?> command, long startPos,
|
|
|
|
|
String pattern, int count) {
|
|
|
|
|
if (pattern == null) {
|
|
|
|
|
return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, command, startPos, "COUNT",
|
|
|
|
@ -140,7 +155,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
public RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos,
|
|
|
|
|
String pattern, int count) {
|
|
|
|
|
return scanIteratorAsync(client, entry, RedisCommands.SCAN, startPos, pattern, count);
|
|
|
|
|
return scanIteratorAsync(client, entry, scan, startPos, pattern, count);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <T> Iterator<T> createKeysIterator(MasterSlaveEntry entry, RedisCommand<?> command, String pattern, int count) {
|
|
|
|
@ -171,7 +186,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
return new CompletableFutureWrapper<>(0L);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return commandExecutor.writeBatchedAsync(null, RedisCommands.TOUCH_LONG, new LongSlotCallback(), names);
|
|
|
|
|
return commandExecutor.writeBatchedAsync(null, RedisCommands.TOUCH_LONG, new LongSlotCallback(), map(names));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -185,11 +200,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
return new CompletableFutureWrapper<>(0L);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
List<String> keysList = Arrays.stream(names)
|
|
|
|
|
.map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k))
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
|
|
|
|
return commandExecutor.readBatchedAsync(StringCodec.INSTANCE, RedisCommands.EXISTS_LONG, new LongSlotCallback(), keysList.toArray(new String[0]));
|
|
|
|
|
return commandExecutor.readBatchedAsync(StringCodec.INSTANCE, RedisCommands.EXISTS_LONG, new LongSlotCallback(), map(names));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -197,9 +208,19 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
return commandExecutor.get(randomKeyAsync());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private final RedisStrictCommand<String> randomKey = new RedisStrictCommand<String>("RANDOMKEY", new Convertor<String>() {
|
|
|
|
|
@Override
|
|
|
|
|
public String convert(Object obj) {
|
|
|
|
|
if (obj == null) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
return unmap((String) obj);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<String> randomKeyAsync() {
|
|
|
|
|
return commandExecutor.readRandomAsync(StringCodec.INSTANCE, RedisCommands.RANDOM_KEY);
|
|
|
|
|
return commandExecutor.readRandomAsync(StringCodec.INSTANCE, randomKey);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -233,7 +254,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
|
|
|
|
|
long count = 0;
|
|
|
|
|
try {
|
|
|
|
|
Iterator<String> keysIterator = createKeysIterator(entry, RedisCommands.SCAN, pattern, batchSize);
|
|
|
|
|
Iterator<String> keysIterator = createKeysIterator(entry, scan, pattern, batchSize);
|
|
|
|
|
List<String> keys = new ArrayList<>();
|
|
|
|
|
while (keysIterator.hasNext()) {
|
|
|
|
|
String key = keysIterator.next();
|
|
|
|
@ -290,12 +311,12 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> deleteAsync(RObject... objects) {
|
|
|
|
|
List<String> keys = new ArrayList<String>();
|
|
|
|
|
List<String> keys = new ArrayList<>();
|
|
|
|
|
for (RObject obj : objects) {
|
|
|
|
|
keys.add(((RedissonObject) obj).getRawName());
|
|
|
|
|
keys.add(obj.getName());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return deleteAsync(keys.toArray(new String[keys.size()]));
|
|
|
|
|
return deleteAsync(keys.toArray(new String[0]));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -309,7 +330,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
return new CompletableFutureWrapper<>(0L);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return commandExecutor.writeBatchedAsync(null, RedisCommands.UNLINK, new LongSlotCallback(), keys);
|
|
|
|
|
return commandExecutor.writeBatchedAsync(null, RedisCommands.UNLINK, new LongSlotCallback(), map(keys));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -318,7 +339,27 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
return new CompletableFutureWrapper<>(0L);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return commandExecutor.writeBatchedAsync(null, RedisCommands.DEL, new LongSlotCallback(), keys);
|
|
|
|
|
return commandExecutor.writeBatchedAsync(null, RedisCommands.DEL, new LongSlotCallback(), map(keys));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String map(String key) {
|
|
|
|
|
return commandExecutor.getConnectionManager().getConfig().getNameMapper().map(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String unmap(String key) {
|
|
|
|
|
return commandExecutor.getConnectionManager().getConfig().getNameMapper().unmap(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private List<String> unmap(List<String> keys) {
|
|
|
|
|
return keys.stream()
|
|
|
|
|
.map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().unmap(k))
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String[] map(String[] keys) {
|
|
|
|
|
return Arrays.stream(keys)
|
|
|
|
|
.map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k))
|
|
|
|
|
.toArray(String[]::new);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -381,7 +422,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> remainTimeToLiveAsync(String name) {
|
|
|
|
|
return commandExecutor.readAsync(name, StringCodec.INSTANCE, RedisCommands.PTTL, name);
|
|
|
|
|
return commandExecutor.readAsync(map(name), StringCodec.INSTANCE, RedisCommands.PTTL, map(name));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -391,7 +432,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> renameAsync(String currentName, String newName) {
|
|
|
|
|
return commandExecutor.writeAsync(currentName, RedisCommands.RENAME, currentName, newName);
|
|
|
|
|
return commandExecutor.writeAsync(map(currentName), RedisCommands.RENAME, map(currentName), map(newName));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -401,7 +442,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> renamenxAsync(String oldName, String newName) {
|
|
|
|
|
return commandExecutor.writeAsync(oldName, RedisCommands.RENAMENX, oldName, newName);
|
|
|
|
|
return commandExecutor.writeAsync(map(oldName), RedisCommands.RENAMENX, map(oldName), map(newName));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -411,7 +452,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> clearExpireAsync(String name) {
|
|
|
|
|
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PERSIST, name);
|
|
|
|
|
return commandExecutor.writeAsync(map(name), StringCodec.INSTANCE, RedisCommands.PERSIST, map(name));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -421,7 +462,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> expireAtAsync(String name, long timestamp) {
|
|
|
|
|
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIREAT, name, timestamp);
|
|
|
|
|
return commandExecutor.writeAsync(map(name), StringCodec.INSTANCE, RedisCommands.PEXPIREAT, map(name), timestamp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -431,7 +472,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> expireAsync(String name, long timeToLive, TimeUnit timeUnit) {
|
|
|
|
|
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIRE, name,
|
|
|
|
|
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIRE, map(name),
|
|
|
|
|
timeUnit.toMillis(timeToLive));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -442,7 +483,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> migrateAsync(String name, String host, int port, int database, long timeout) {
|
|
|
|
|
return commandExecutor.writeAsync(name, RedisCommands.MIGRATE, host, port, name, database, timeout);
|
|
|
|
|
return commandExecutor.writeAsync(map(name), RedisCommands.MIGRATE, host, port, map(name), database, timeout);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -452,7 +493,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Void> copyAsync(String name, String host, int port, int database, long timeout) {
|
|
|
|
|
return commandExecutor.writeAsync(name, RedisCommands.MIGRATE, host, port, name, database, timeout, "COPY");
|
|
|
|
|
return commandExecutor.writeAsync(map(name), RedisCommands.MIGRATE, host, port, map(name), database, timeout, "COPY");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -462,7 +503,7 @@ public class RedissonKeys implements RKeys {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> moveAsync(String name, int database) {
|
|
|
|
|
return commandExecutor.writeAsync(name, RedisCommands.MOVE, name, database);
|
|
|
|
|
return commandExecutor.writeAsync(map(name), RedisCommands.MOVE, map(name), database);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|