Feature - copy() and copyAndReplace() methods added to RObject #6057

pull/6077/head
Nikita Koksharov 8 months ago
parent 91c8ff5e22
commit 017b8306b4

@ -407,4 +407,9 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
return commandExecutor.handleNoSync(ttlRemainingFuture, () -> unlockInnerAsync(threadId));
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
throw new UnsupportedOperationException();
}
}

@ -260,9 +260,17 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
return deleteAsync(getRawName(), configName);
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
String newName = (String) keys.get(1);
List<Object> kks = Arrays.asList(getRawName(), configName,
newName, suffixName(newName, "config"));
return super.copyAsync(kks, database, replace);
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(getRawName(), configName);
List<Object> keys = Arrays.asList(getRawName(), configName);
return super.sizeInMemoryAsync(keys);
}
@ -398,13 +406,13 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override
public RFuture<Void> renameAsync(String newName) {
String newConfigName = suffixName(newName, "config");
String newConfigName = suffixName(mapName(newName), "config");
RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"if redis.call('exists', KEYS[1]) == 1 then " +
"redis.call('rename', KEYS[1], ARGV[1]); " +
"end; " +
"return redis.call('rename', KEYS[2], ARGV[2]); ",
Arrays.<Object>asList(getRawName(), configName), newName, newConfigName);
Arrays.asList(getRawName(), configName), mapName(newName), newConfigName);
CompletionStage<Void> f = future.thenApply(value -> {
setName(newName);
this.configName = newConfigName;
@ -415,7 +423,7 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override
public RFuture<Boolean> renamenxAsync(String newName) {
String newConfigName = suffixName(newName, "config");
String newConfigName = suffixName(mapName(newName), "config");
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local r = redis.call('renamenx', KEYS[1], ARGV[1]); "
+ "if r == 0 then "
@ -423,7 +431,7 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
+ "else "
+ " return redis.call('renamenx', KEYS[2], ARGV[2]); "
+ "end; ",
Arrays.asList(getRawName(), configName), newName, newConfigName);
Arrays.asList(getRawName(), configName), mapName(newName), newConfigName);
CompletionStage<Boolean> f = future.thenApply(value -> {
if (value) {
setName(newName);

@ -46,23 +46,26 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
private final RedissonBlockingQueue<V> blockingQueue;
private final RedissonQueueSemaphore semaphore;
private final String channelName;
private final String semaphoreName;
protected RedissonBoundedBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
blockingQueue = new RedissonBlockingQueue<>(commandExecutor, name, redisson);
semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), getServiceManager().getCfg().getCodec());
semaphoreName = getSemaphoreName(getRawName());
semaphore = new RedissonQueueSemaphore(commandExecutor, semaphoreName, getServiceManager().getCfg().getCodec());
channelName = RedissonSemaphore.getChannelName(semaphore.getRawName());
}
protected RedissonBoundedBlockingQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
blockingQueue = new RedissonBlockingQueue<>(commandExecutor, name, redisson);
semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), codec);
semaphoreName = getSemaphoreName(getRawName());
semaphore = new RedissonQueueSemaphore(commandExecutor, semaphoreName, codec);
channelName = RedissonSemaphore.getChannelName(semaphore.getRawName());
}
private String getSemaphoreName() {
return prefixName("redisson_bqs", getRawName());
private String getSemaphoreName(String name) {
return prefixName("redisson_bqs", name);
}
@Override
@ -88,7 +91,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
}
private RedissonQueueSemaphore createSemaphore(V e) {
RedissonQueueSemaphore semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), getCodec());
RedissonQueueSemaphore semaphore = new RedissonQueueSemaphore(commandExecutor, semaphoreName, getCodec());
semaphore.setQueueName(getRawName());
semaphore.setValue(e);
return semaphore;
@ -379,28 +382,36 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public RFuture<Boolean> deleteAsync() {
return deleteAsync(getRawName(), getSemaphoreName());
return deleteAsync(getRawName(), semaphoreName);
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
String newName = (String) keys.get(1);
List<Object> kks = Arrays.asList(getRawName(), semaphoreName,
newName, getSemaphoreName(newName));
return super.copyAsync(kks, database, replace);
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(getRawName(), getSemaphoreName());
List<Object> keys = Arrays.<Object>asList(getRawName(), semaphoreName);
return super.sizeInMemoryAsync(keys);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), getSemaphoreName());
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), semaphoreName);
}
@Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
return super.expireAtAsync(timestamp, param, getRawName(), getSemaphoreName());
return super.expireAtAsync(timestamp, param, getRawName(), semaphoreName);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
return clearExpireAsync(getRawName(), getSemaphoreName());
return clearExpireAsync(getRawName(), semaphoreName);
}
@Override
@ -409,7 +420,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
return new CompletableFutureWrapper<>(false);
}
RedissonQueueSemaphore semaphore = new RedissonQueueSemaphore(commandExecutor, getSemaphoreName(), getCodec());
RedissonQueueSemaphore semaphore = new RedissonQueueSemaphore(commandExecutor, semaphoreName, getCodec());
semaphore.setQueueName(getRawName());
semaphore.setValues(c);
return semaphore.tryAcquireAsync();

@ -43,7 +43,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
super(codec, commandExecutor, name);
channelName = prefixName("redisson_delay_queue_channel", getRawName());
queueName = prefixName("redisson_delay_queue", getRawName());
timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());
timeoutSetName = getTimeoutSetName(getRawName());
QueueTransferTask task = new QueueTransferTask(commandExecutor.getServiceManager()) {
@ -78,6 +78,10 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
commandExecutor.getServiceManager().getQueueTransferService().schedule(queueName, task);
}
private String getTimeoutSetName(String rawName) {
return prefixName("redisson_delay_queue_timeout", rawName);
}
@Override
public void offer(V e, long delay, TimeUnit timeUnit) {
get(offerAsync(e, delay, timeUnit));
@ -425,6 +429,14 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
return super.sizeInMemoryAsync(keys);
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
String newName = (String) keys.get(1);
List<Object> kks = Arrays.asList(queueName, timeoutSetName,
newName, getTimeoutSetName(newName));
return super.copyAsync(kks, database, replace);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
return super.expireAsync(timeToLive, timeUnit, param, queueName, timeoutSetName);

@ -43,12 +43,15 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
final Logger log = LoggerFactory.getLogger(getClass());
private final String allocationSizeName;
RedissonIdGenerator(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
allocationSizeName = getAllocationSizeName(getRawName());
}
private String getAllocationSizeName() {
return suffixName(getRawName(), "allocation");
private String getAllocationSizeName(String name) {
return suffixName(name, "allocation");
}
@Override
@ -66,7 +69,7 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
return commandExecutor.evalWriteNoRetryAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('setnx', KEYS[1], ARGV[1]); "
+ "return redis.call('setnx', KEYS[2], ARGV[2]); ",
Arrays.asList(getRawName(), getAllocationSizeName()), value, allocationSize);
Arrays.asList(getRawName(), allocationSizeName), value, allocationSize);
}
private final AtomicLong start = new AtomicLong();
@ -122,7 +125,7 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
"end; " +
"redis.call('incrby', KEYS[1], allocationSize); " +
"return {value, allocationSize}; ",
Arrays.asList(getRawName(), getAllocationSizeName()));
Arrays.asList(getRawName(), allocationSizeName));
future.whenComplete((res, ex) -> {
if (ex != null) {
if (getServiceManager().isShuttingDown(ex)) {
@ -162,27 +165,35 @@ public class RedissonIdGenerator extends RedissonExpirable implements RIdGenerat
@Override
public RFuture<Boolean> deleteAsync() {
return deleteAsync(getRawName(), getAllocationSizeName());
return deleteAsync(getRawName(), allocationSizeName);
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
return super.sizeInMemoryAsync(Arrays.asList(getRawName(), getAllocationSizeName()));
return super.sizeInMemoryAsync(Arrays.asList(getRawName(), allocationSizeName));
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
String newName = (String) keys.get(1);
List<Object> kks = Arrays.asList(getRawName(), allocationSizeName,
newName, getAllocationSizeName(newName));
return super.copyAsync(kks, database, replace);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), getAllocationSizeName());
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), allocationSizeName);
}
@Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
return super.expireAtAsync(timestamp, param, getRawName(), getAllocationSizeName());
return super.expireAtAsync(timestamp, param, getRawName(), allocationSizeName);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
return clearExpireAsync(getRawName(), getAllocationSizeName());
return clearExpireAsync(getRawName(), allocationSizeName);
}
}

@ -111,6 +111,11 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
Arrays.<Object>asList(timeoutSetName, getRawName()), System.currentTimeMillis(), key);
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
throw new UnsupportedOperationException();
}
@Override
public int size() {
return get(sizeAsync());

@ -66,13 +66,21 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
private final EvictionScheduler evictionScheduler;
protected String publishCommand;
private String timeoutSetName;
private String idleSetName;
private String lastAccessTimeSetName;
private String optionsName;
public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor,
String name, RedissonClient redisson, MapCacheOptions<K, V> options, WriteBehindService writeBehindService) {
super(commandExecutor, name, redisson, options, writeBehindService);
this.timeoutSetName = getTimeoutSetName(getRawName());
this.idleSetName = getIdleSetName(getRawName());
this.lastAccessTimeSetName = getLastAccessTimeSetName(getRawName());
this.optionsName = getOptionsName(getRawName());
if (evictionScheduler != null) {
evictionScheduler.schedule(getRawName(), getTimeoutSetName(), getIdleSetName(),
getExpiredChannelName(), getLastAccessTimeSetName(), options,
evictionScheduler.schedule(getRawName(), timeoutSetName, idleSetName,
getExpiredChannelName(), lastAccessTimeSetName, options,
commandExecutor.getConnectionManager().getSubscribeService().getPublishCommand());
}
this.evictionScheduler = evictionScheduler;
@ -82,9 +90,13 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor,
String name, RedissonClient redisson, MapCacheOptions<K, V> options, WriteBehindService writeBehindService) {
super(codec, commandExecutor, name, redisson, options, writeBehindService);
this.timeoutSetName = getTimeoutSetName(getRawName());
this.idleSetName = getIdleSetName(getRawName());
this.lastAccessTimeSetName = getLastAccessTimeSetName(getRawName());
this.optionsName = getOptionsName(getRawName());
if (evictionScheduler != null) {
evictionScheduler.schedule(getRawName(), getTimeoutSetName(), getIdleSetName(),
getExpiredChannelName(), getLastAccessTimeSetName(), options,
evictionScheduler.schedule(getRawName(), timeoutSetName, idleSetName,
getExpiredChannelName(), lastAccessTimeSetName, options,
commandExecutor.getConnectionManager().getSubscribeService().getPublishCommand());
}
this.evictionScheduler = evictionScheduler;
@ -104,7 +116,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
public RFuture<Boolean> trySetMaxSizeAsync(int maxSize) {
return trySetMaxSizeAsync(maxSize, EvictionMode.LRU);
}
public RFuture<Boolean> trySetMaxSizeAsync(int maxSize, EvictionMode mode) {
if (maxSize < 0) {
throw new IllegalArgumentException("maxSize should be greater than zero");
@ -113,7 +125,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('hsetnx', KEYS[1], 'max-size', ARGV[1]);"
+ "return redis.call('hsetnx', KEYS[1], 'mode', ARGV[2]);",
Collections.singletonList(getOptionsName()), maxSize, mode);
Collections.singletonList(optionsName), maxSize, mode);
}
@Override
@ -136,7 +148,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
List<Object> params = new ArrayList<>(3);
params.add(getOptionsName());
params.add(optionsName);
params.add("max-size");
params.add(maxSize);
params.add("mode");
@ -230,7 +242,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" end;" +
"end;" +
"return 0;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName),
System.currentTimeMillis(), encodeMapValue(value));
}
@ -238,7 +250,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
public RFuture<Map<K, V>> getAllOperationAsync(Set<K> keys) {
List<Object> args = new ArrayList<>(keys.size() + 1);
List<Object> plainKeys = new ArrayList<>(keys);
args.add(System.currentTimeMillis());
encodeMapKeys(args, keys);
@ -250,7 +262,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"local maxSize = tonumber(redis.call('hget', KEYS[5], 'max-size'));" +
"local map = {}; " +
"for i = 1, #ARGV, 1 do " +
" local value = redis.call('hget', KEYS[1], ARGV[i]); " +
" local value = redis.call('hget', KEYS[1], ARGV[i]); " +
" map[i] = false;" +
" if value ~= false then " +
" local key = ARGV[i]; " +
@ -283,7 +295,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" end; " +
"end; " +
"return map;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName),
args.toArray());
}
@ -306,7 +318,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
public RFuture<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
checkKey(key);
checkValue(value);
if (ttl < 0) {
throw new IllegalArgumentException("ttl can't be negative");
}
@ -787,12 +799,12 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
getLastAccessTimeSetName(name), getRemovedChannelName(name), getOptionsName(name)),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), publishCommand);
}
@Override
public void putAll(Map<? extends K, ? extends V> map, long ttl, TimeUnit ttlUnit) {
get(putAllAsync(map, ttl, ttlUnit));
}
@Override
public RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long ttl, TimeUnit ttlUnit) {
if (map.isEmpty()) {
@ -1326,8 +1338,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"end;" +
"return counter;",
Arrays.asList(getRawName(), getTimeoutSetName(), getIdleSetName(),
getLastAccessTimeSetName(), getOptionsName()),
Arrays.asList(getRawName(), timeoutSetName, idleSetName,
lastAccessTimeSetName, optionsName),
args.toArray());
return future;
}
@ -1422,8 +1434,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"end;" +
"return counter;",
Arrays.asList(getRawName(), getTimeoutSetName(), getIdleSetName(),
getLastAccessTimeSetName(), getOptionsName()),
Arrays.asList(getRawName(), timeoutSetName, idleSetName,
lastAccessTimeSetName, optionsName),
args.toArray());
return future;
}
@ -1688,7 +1700,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" end; " +
"end; " +
"return map;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
args.toArray());
}
@ -1697,7 +1709,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
public long remainTimeToLive(K key) {
return get(remainTimeToLiveAsync(key));
}
@Override
public RFuture<Long> remainTimeToLiveAsync(K key) {
checkKey(key);
@ -1720,7 +1732,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate == 92233720368547758 then "
+ "return -1; "
+ "end;"
@ -1739,34 +1751,18 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return prefixName("redisson__timeout__set", name);
}
String getTimeoutSetName() {
return prefixName("redisson__timeout__set", getRawName());
}
String getLastAccessTimeSetName(String name) {
return prefixName("redisson__map_cache__last_access__set", name);
}
String getLastAccessTimeSetName() {
return prefixName("redisson__map_cache__last_access__set", getRawName());
}
String getIdleSetName(String name) {
return prefixName("redisson__idle__set", name);
}
String getIdleSetName() {
return prefixName("redisson__idle__set", getRawName());
}
String getOptionsName() {
return suffixName(getRawName(), "redisson_options");
}
String getOptionsName(String name) {
return suffixName(name, "redisson_options");
}
String getCreatedChannelName(String name) {
return prefixName("redisson_map_cache_created", name);
}
@ -1778,7 +1774,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
String getUpdatedChannelName() {
return prefixName("redisson_map_cache_updated", getRawName());
}
String getUpdatedChannelName(String name) {
return prefixName("redisson_map_cache_updated", name);
}
@ -1786,7 +1782,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
String getExpiredChannelName(String name) {
return prefixName("redisson_map_cache_expired", name);
}
String getExpiredChannelName() {
return prefixName("redisson_map_cache_expired", getRawName());
}
@ -1794,7 +1790,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
String getRemovedChannelName() {
return prefixName("redisson_map_cache_removed", getRawName());
}
String getRemovedChannelName(String name) {
return prefixName("redisson_map_cache_removed", name);
}
@ -1874,8 +1870,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "table.insert(result, val); "
+ "end;"
+ "return result;",
Arrays.asList(getRawName(), getTimeoutSetName(), getIdleSetName(),
getRemovedChannelName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.asList(getRawName(), timeoutSetName, idleSetName,
getRemovedChannelName(), lastAccessTimeSetName, optionsName),
args.toArray());
return future;
}
@ -1917,8 +1913,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "return n; ",
Arrays.asList(getRawName(), getTimeoutSetName(), getIdleSetName(),
getRemovedChannelName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.asList(getRawName(), timeoutSetName, idleSetName,
getRemovedChannelName(), lastAccessTimeSetName, optionsName),
params.toArray());
}
@ -2565,7 +2561,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
Arrays.asList(name, getTimeoutSetName(name), getIdleSetName(name), getUpdatedChannelName(name), getOptionsName(name)),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), publishCommand);
}
@Override
protected RFuture<V> replaceOperationAsync(K key, V value) {
String name = getRawName(key);
@ -2618,7 +2614,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"local maxSize = tonumber(redis.call('hget', KEYS[8], 'max-size'));" +
"local mode = redis.call('hget', KEYS[8], 'mode'); " +
"for i, value in ipairs(ARGV) do "
+ "if i % 2 == 0 then "
+ "if i % 2 == 0 then "
+ "local key = ARGV[i-1];" +
"local v = redis.call('hget', KEYS[1], key);" +
@ -2688,7 +2684,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"local msg = struct.pack('Lc0Lc0Lc0', string.len(key), key, string.len(value), value, string.len(val), val);" +
"redis.call(publishCommand, KEYS[5], msg);" +
"end; " +
" if maxSize ~= nil and maxSize ~= 0 then " +
"if mode == false or mode == 'LRU' then " +
"redis.call('zadd', lastAccessTimeSetName, currentTime, key); " +
@ -2699,11 +2695,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"end;"
+ "end;"
+ "end;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getCreatedChannelName(),
getUpdatedChannelName(), getLastAccessTimeSetName(), getRemovedChannelName(), getOptionsName()),
Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, getCreatedChannelName(),
getUpdatedChannelName(), lastAccessTimeSetName, getRemovedChannelName(), optionsName),
params.toArray());
}
private RFuture<Void> putAllOperationAsync(Map<? extends K, ? extends V> map, long ttl, TimeUnit ttlUnit) {
List<Object> params = new ArrayList<Object>(map.size()*2 + 2);
params.add(System.currentTimeMillis());
@ -2723,7 +2719,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"local maxSize = tonumber(redis.call('hget', KEYS[8], 'max-size'));" +
"local mode = redis.call('hget', KEYS[8], 'mode'); " +
"for i, value in ipairs(ARGV) do "
+ "if i % 2 == 0 then "
+ "if i % 2 == 0 then "
+ "local key = ARGV[i-1];" +
"local v = redis.call('hget', KEYS[1], key);" +
@ -2800,7 +2796,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"local msg = struct.pack('Lc0Lc0Lc0', string.len(key), key, string.len(value), value, string.len(val), val);" +
"redis.call(publishCommand, KEYS[5], msg);" +
"end; " +
" if maxSize ~= nil and maxSize ~= 0 then " +
"if mode == false or mode == 'LRU' then " +
"redis.call('zadd', lastAccessTimeSetName, currentTime, key); " +
@ -2811,14 +2807,14 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"end;"
+ "end;"
+ "end;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getCreatedChannelName(),
getUpdatedChannelName(), getLastAccessTimeSetName(), getRemovedChannelName(), getOptionsName()),
Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, getCreatedChannelName(),
getUpdatedChannelName(), lastAccessTimeSetName, getRemovedChannelName(), optionsName),
params.toArray());
}
private volatile MapCacheEventCodec.OSType osType;
private volatile Codec topicCodec;
@Override
public int addListener(MapEntryListener listener) {
return get(addListenerAsync(listener));
@ -2847,7 +2843,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
topicCodec = new MapCacheEventCodec(codec, osType);
}).thenCompose(r -> {
return commandExecutor.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.HSET_VOID, getOptionsName(), "has-listeners", 1);
return commandExecutor.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.HSET_VOID, optionsName, "has-listeners", 1);
});
}
@ -2922,10 +2918,18 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName());
List<Object> keys = Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName);
return super.sizeInMemoryAsync(keys);
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
String newName = (String) keys.get(1);
List<Object> kks = Arrays.asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName,
newName, getTimeoutSetName(newName), getIdleSetName(newName), getLastAccessTimeSetName(newName), getOptionsName(newName));
return super.copyAsync(kks, database, replace);
}
@Override
public void clear() {
get(clearAsync());
@ -2933,12 +2937,12 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public RFuture<Boolean> clearAsync() {
return deleteAsync(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName());
return deleteAsync(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName);
}
@Override
public RFuture<Boolean> deleteAsync() {
return deleteAsync(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName());
return deleteAsync(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName);
}
@Override
@ -2965,7 +2969,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('pexpire', KEYS[2], ARGV[1]); "
+ "redis.call('pexpire', KEYS[3], ARGV[1]); "
+ "return redis.call('pexpire', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName),
timeUnit.toMillis(timeToLive), param);
}
@ -2993,7 +2997,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('pexpireat', KEYS[2], ARGV[1]); "
+ "redis.call('pexpireat', KEYS[3], ARGV[1]); "
+ "return redis.call('pexpireat', KEYS[1], ARGV[1]); ",
Arrays.asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName),
timestamp, param);
}
@ -3006,19 +3010,19 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" redis.call('zrem', KEYS[4], 92233720368547758, 'redisson__expiretag'); " +
" redis.call('persist', KEYS[4]); " +
"end; " +
"redis.call('zrem', KEYS[2], 'redisson__expiretag'); " +
"redis.call('persist', KEYS[2]); " +
"redis.call('zrem', KEYS[3], 'redisson__expiretag'); " +
"redis.call('persist', KEYS[3]); " +
"return redis.call('persist', KEYS[1]); ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()));
Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName));
}
@Override
public RFuture<Set<K>> readAllKeySetAsync() {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_KEY_SET,
"local s = redis.call('hgetall', KEYS[1]); " +
"local s = redis.call('hgetall', KEYS[1]); " +
"local maxSize = tonumber(redis.call('hget', KEYS[5], 'max-size'));"
+ "local result = {}; "
+ "for i, v in ipairs(s) do "
@ -3053,7 +3057,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "end;" +
"return result;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName),
System.currentTimeMillis());
}
@ -3098,7 +3102,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName),
System.currentTimeMillis(), count);
}
@ -3144,7 +3148,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName),
System.currentTimeMillis(), count);
}
@ -3191,7 +3195,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "end;" +
"return result;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName),
System.currentTimeMillis());
}
@ -3200,7 +3204,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return readAll(RedisCommands.EVAL_MAP);
}
@Override
public RFuture<Collection<V>> readAllValuesAsync() {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST,
@ -3239,7 +3243,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "end;" +
"return result;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()),
Arrays.<Object>asList(getRawName(), timeoutSetName, idleSetName, lastAccessTimeSetName, optionsName),
System.currentTimeMillis());
}

@ -69,6 +69,11 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
"return size; ", Arrays.asList(getRawName()), prefix);
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
throw new UnsupportedOperationException();
}
@Override
public RLock getFairLock(K key) {
String lockName = getLockByMapKey(key, "fairlock");

@ -116,7 +116,7 @@ public abstract class RedissonObject implements RObject {
return getRawName();
}
protected final void setName(String name) {
protected void setName(String name) {
this.name = mapName(name);
}
@ -152,6 +152,89 @@ public abstract class RedissonObject implements RObject {
return get(sizeInMemoryAsync());
}
@Override
public final RFuture<Boolean> copyAsync(String destination) {
return copyAsync(destination, -1);
}
@Override
public final RFuture<Boolean> copyAsync(String destination, int database) {
return copyAsync(Arrays.asList(getRawName(), mapName(destination)), database, false);
}
@Override
public final RFuture<Boolean> copyAndReplaceAsync(String destination) {
return copyAndReplaceAsync(destination, -1);
}
@Override
public final RFuture<Boolean> copyAndReplaceAsync(String destination, int database) {
return copyAsync(Arrays.asList(getRawName(), mapName(destination)), database, true);
}
protected RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
return copyAsync(commandExecutor, keys, database, replace);
}
private RFuture<Boolean> copyAsync(CommandAsyncExecutor commandExecutor, List<Object> keys,
int database, boolean replace) {
if (keys.size() == 2) {
List<Object> args = new LinkedList<>();
args.add(keys.get(0));
args.add(keys.get(1));
if (database > 0) {
args.add("DB");
args.add(database);
}
if (replace) {
args.add("REPLACE");
}
return commandExecutor.writeAsync((String) keys.get(0), StringCodec.INSTANCE, RedisCommands.COPY, args.toArray());
}
return commandExecutor.evalWriteAsync((String) keys.get(0), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local res = 0;" +
"local newKeysIndex = #KEYS/2; "
+ "for j = 1, newKeysIndex, 1 do " +
"if tonumber(ARGV[1]) >= 0 then "
+ "if ARGV[2] == '1' then "
+ "res = res + redis.call('copy', KEYS[j], KEYS[newKeysIndex + j], 'db', ARGV[1], 'replace'); "
+ "else "
+ "res = res + redis.call('copy', KEYS[j], KEYS[newKeysIndex + j], 'db', ARGV[1]); "
+ "end; "
+ "end; "
+ "if ARGV[2] == '1' then "
+ "res = res + redis.call('copy', KEYS[j], KEYS[newKeysIndex + j], 'replace'); "
+ "else "
+ "res = res + redis.call('copy', KEYS[j], KEYS[newKeysIndex + j]); "
+ "end; "
+ "end; "
+ "return math.min(res, 1); ",
keys,
database, Boolean.compare(replace, false));
}
@Override
public final boolean copy(String destination) {
return get(copyAsync(destination));
}
@Override
public final boolean copy(String destination, int database) {
return get(copyAsync(destination, database));
}
@Override
public final boolean copyAndReplace(String destination) {
return get(copyAndReplaceAsync(destination));
}
@Override
public final boolean copyAndReplace(String destination, int database) {
return get(copyAndReplaceAsync(destination, database));
}
protected final String mapName(String name) {
return commandExecutor.getServiceManager().getConfig().getNameMapper().map(name);
}
@ -241,7 +324,7 @@ public abstract class RedissonObject implements RObject {
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.DEL_BOOL, getRawName());
}
protected RFuture<Boolean> deleteAsync(String... keys) {
protected final RFuture<Boolean> deleteAsync(String... keys) {
return commandExecutor.writeAsync(keys[0], StringCodec.INSTANCE, RedisCommands.DEL_OBJECTS, keys);
}

@ -685,7 +685,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
List<Object> keys = Arrays.<Object>asList(getRawName(), timeoutName);
return super.sizeInMemoryAsync(keys);
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> deleteAsync() {
return deleteAsync(getRawName(), timeoutName);

@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@ -74,6 +75,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
private volatile Timeout timeoutTask;
private final RStream<String, Object> stream;
private final AtomicBoolean subscribed = new AtomicBoolean();
private final String timeoutName;
public RedissonReliableTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name, String subscriberId) {
super(codec, commandExecutor, name);
@ -82,14 +84,15 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
subscriberId = getServiceManager().generateId();
}
this.subscriberId = subscriberId;
this.timeoutName = getTimeout(getRawName());
}
public RedissonReliableTopic(CommandAsyncExecutor commandExecutor, String name, String subscriberId) {
this(commandExecutor.getServiceManager().getCfg().getCodec(), commandExecutor, name, subscriberId);
}
private String getTimeout() {
return suffixName(getRawName(), "timeout");
private String getTimeout(String name) {
return suffixName(name, "timeout");
}
@Override
@ -153,7 +156,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
RFuture<Void> addFuture = commandExecutor.evalWriteNoRetryAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"redis.call('zadd', KEYS[2], ARGV[3], ARGV[2]);" +
"redis.call('xgroup', 'create', KEYS[1], ARGV[2], ARGV[1], 'MKSTREAM'); ",
Arrays.asList(getRawName(), getTimeout()),
Arrays.asList(getRawName(), timeoutName),
StreamMessageId.ALL, subscriberId, System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout());
CompletionStage<String> f = addFuture.thenApply(r -> {
@ -245,7 +248,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
+ "redis.call('xtrim', KEYS[1], 'maxlen', #range); "
+ "end;"
+ "return r ~= false; ",
Arrays.asList(getRawName(), getTimeout()),
Arrays.asList(getRawName(), timeoutName),
id, time);
updateFuture.whenComplete((re, exc) -> {
@ -270,27 +273,35 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
@Override
public RFuture<Boolean> deleteAsync() {
return deleteAsync(getRawName(), getTimeout());
return deleteAsync(getRawName(), timeoutName);
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
return super.sizeInMemoryAsync(Arrays.asList(getRawName(), getTimeout()));
return super.sizeInMemoryAsync(Arrays.asList(getRawName(), timeoutName));
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
String newName = (String) keys.get(1);
List<Object> kks = Arrays.asList(getRawName(), timeoutName,
newName, getTimeout(newName));
return super.copyAsync(kks, database, replace);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), getTimeout());
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), timeoutName);
}
@Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
return super.expireAtAsync(timestamp, param, getRawName(), getTimeout());
return super.expireAtAsync(timestamp, param, getRawName(), timeoutName);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
return clearExpireAsync(getRawName(), getTimeout());
return clearExpireAsync(getRawName(), timeoutName);
}
@Override
@ -311,7 +322,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
return commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"redis.call('xgroup', 'destroy', KEYS[1], ARGV[1]); "
+ "redis.call('zrem', KEYS[2], ARGV[1]); ",
Arrays.asList(getRawName(), getTimeout()),
Arrays.asList(getRawName(), timeoutName),
subscriberId);
}
@ -340,7 +351,7 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
+ "end; "
+ "redis.call('zadd', KEYS[1], ARGV[1], ARGV[2]); "
+ "return 1; ",
Arrays.asList(getTimeout()),
Arrays.asList(timeoutName),
System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout(), subscriberId);
future.whenComplete((res, e) -> {
if (e != null) {

@ -133,6 +133,11 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return super.sizeInMemoryAsync(keys);
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Integer> sizeAsync() {
return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_INTEGER,

@ -45,13 +45,15 @@ import java.util.stream.Stream;
public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTimeSeries<V, L> {
private final EvictionScheduler evictionScheduler;
private final String timeoutSetName;
public RedissonTimeSeries(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
this.evictionScheduler = evictionScheduler;
this.timeoutSetName = getTimeoutSetName(getRawName());
if (evictionScheduler != null) {
evictionScheduler.scheduleTimeSeries(getRawName(), getTimeoutSetName());
evictionScheduler.scheduleTimeSeries(getRawName(), timeoutSetName);
}
}
@ -59,13 +61,14 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
super(codec, connectionManager, name);
this.evictionScheduler = evictionScheduler;
this.timeoutSetName = getTimeoutSetName(getRawName());
if (evictionScheduler != null) {
evictionScheduler.scheduleTimeSeries(getRawName(), getTimeoutSetName());
evictionScheduler.scheduleTimeSeries(getRawName(), timeoutSetName);
}
}
String getTimeoutSetName() {
return prefixName("redisson__ts_ttl", getRawName());
String getTimeoutSetName(String name) {
return prefixName("redisson__ts_ttl", name);
}
@Override
@ -168,7 +171,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"redis.call('zadd', KEYS[1], ARGV[i], val); " +
"redis.call('zadd', KEYS[2], ARGV[1], val); " +
"end; ",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
params.toArray());
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
@ -182,7 +185,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"redis.call('zadd', KEYS[1], ARGV[i], val); " +
"redis.call('zadd', KEYS[2], expirationTime + 1, val); " +
"end; ",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
params.toArray());
}
@ -239,7 +242,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"redis.call('zadd', KEYS[1], ARGV[i], val); " +
"redis.call('zadd', KEYS[2], ARGV[1], val); " +
"end; ",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
params.toArray());
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
@ -256,7 +259,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"redis.call('zadd', KEYS[1], ARGV[i], val); " +
"redis.call('zadd', KEYS[2], expirationTime + 1, val); " +
"end; ",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
params.toArray());
}
@ -270,7 +273,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
return commandExecutor.evalReadAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local values = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]);" +
"return redis.call('zcard', KEYS[1]) - #values;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis());
}
@ -293,7 +296,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"end;" +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[1]); " +
"return val;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), timestamp);
}
@ -319,7 +322,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"return {n, ARGV[2], val};" +
"end;" +
"return {n, ARGV[2], val, label};",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), timestamp);
}
@ -343,7 +346,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"redis.call('zrem', KEYS[2], values[1]); " +
"redis.call('zrem', KEYS[1], values[1]); " +
"return 1;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), timestamp);
}
@ -368,7 +371,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"redis.call('zrem', KEYS[1], values[1]); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[1]); " +
"return val;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), timestamp);
}
@ -396,7 +399,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"return {n, ARGV[2], val};" +
"end;" +
"return {n, ARGV[2], val, label};",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), timestamp);
}
@ -515,7 +518,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"table.insert(result, t);" +
"end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), startScore, limit);
}
@ -534,7 +537,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"table.insert(result, val);" +
"end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), startScore, limit);
}
@ -560,7 +563,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"table.insert(result, score);" +
"end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), startScore, limit);
}
@ -584,7 +587,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"end;" +
"end;" +
"return counter;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), startTimestamp, endTimestamp);
}
@ -671,7 +674,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"from = '(' .. values[#values];" +
"limit = tonumber(ARGV[4]) - #result/4;" +
"end;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), startTimestamp, endTimestamp, limit, Boolean.compare(reverse, false), encode((Object) null));
}
@ -741,7 +744,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"from = '(' .. values[#values];" +
"limit = tonumber(ARGV[4]) - #result;" +
"end;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), startTimestamp, endTimestamp, limit, Boolean.compare(reverse, false));
}
@ -876,7 +879,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"table.insert(result, val);" +
"end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), startScore, limit);
}
@ -904,7 +907,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"table.insert(result, score);" +
"end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName()),
Arrays.asList(getRawName(), timeoutSetName),
System.currentTimeMillis(), startScore, limit);
}
@ -937,7 +940,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
"end;"
+ "return {tostring(nextPos), result};",
Arrays.asList(name, getTimeoutSetName()),
Arrays.asList(name, timeoutSetName),
params.toArray());
}
@ -982,28 +985,36 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
@Override
public RFuture<Boolean> deleteAsync() {
return deleteAsync(getRawName(), getTimeoutSetName());
return deleteAsync(getRawName(), timeoutSetName);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), getTimeoutSetName());
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), timeoutSetName);
}
@Override
protected RFuture<Boolean> expireAtAsync(long timestamp, String param, String... keys) {
return super.expireAtAsync(timestamp, getRawName(), getTimeoutSetName());
return super.expireAtAsync(timestamp, getRawName(), timeoutSetName);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
return clearExpireAsync(getRawName(), getTimeoutSetName());
return clearExpireAsync(getRawName(), timeoutSetName);
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.asList(getRawName(), getTimeoutSetName());
List<Object> keys = Arrays.asList(getRawName(), timeoutSetName);
return super.sizeInMemoryAsync(keys);
}
@Override
public RFuture<Boolean> copyAsync(List<Object> keys, int database, boolean replace) {
String newName = (String) keys.get(1);
List<Object> kks = Arrays.asList(getRawName(), timeoutSetName,
newName, getTimeoutSetName(newName));
return super.copyAsync(kks, database, replace);
}
}

@ -35,7 +35,7 @@ public interface RObject extends RObjectAsync {
Long getIdleTime();
/**
* Returns bytes amount used by object in Redis memory.
* Returns bytes amount used by object in Redis memory.
*
* @return size in bytes
*/
@ -106,7 +106,41 @@ public interface RObject extends RObjectAsync {
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
*/
void copy(String host, int port, int database, long timeout);
/**
* Copy this object instance to the new instance with a defined name.
*
* @param destination name of the destination instance
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
boolean copy(String destination);
/**
* Copy this object instance to the new instance with a defined name and database.
*
* @param destination name of the destination instance
* @param database database number
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
boolean copy(String destination, int database);
/**
* Copy this object instance to the new instance with a defined name, and replace it if it already exists.
*
* @param destination name of the destination instance
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
boolean copyAndReplace(String destination);
/**
* Copy this object instance to the new instance with a defined name and database, and replace it if it already exists.
*
* @param destination name of the destination instance
* @param database database number
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
boolean copyAndReplace(String destination, int database);
/**
* Move object to another database
*

@ -112,7 +112,41 @@ public interface RObjectAsync {
* @return void
*/
RFuture<Void> copyAsync(String host, int port, int database, long timeout);
/**
* Copy this object instance to the new instance with a defined name.
*
* @param destination name of the destination instance
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
RFuture<Boolean> copyAsync(String destination);
/**
* Copy this object instance to the new instance with a defined name and database.
*
* @param destination name of the destination instance
* @param database database number
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
RFuture<Boolean> copyAsync(String destination, int database);
/**
* Copy this object instance to the new instance with a defined name, and replace it if it already exists.
*
* @param destination name of the destination instance
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
RFuture<Boolean> copyAndReplaceAsync(String destination);
/**
* Copy this object instance to the new instance with a defined name and database, and replace it if it already exists.
*
* @param destination name of the destination instance
* @param database database number
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
RFuture<Boolean> copyAndReplaceAsync(String destination, int database);
/**
* Move object to another database in async mode
*

@ -117,7 +117,41 @@ public interface RObjectReactive {
* @return void
*/
Mono<Void> copy(String host, int port, int database, long timeout);
/**
* Copy this object instance to the new instance with a defined name.
*
* @param destination name of the destination instance
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
Mono<Boolean> copy(String destination);
/**
* Copy this object instance to the new instance with a defined name and database.
*
* @param destination name of the destination instance
* @param database database number
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
Mono<Boolean> copy(String destination, int database);
/**
* Copy this object instance to the new instance with a defined name, and replace it if it already exists.
*
* @param destination name of the destination instance
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
Mono<Boolean> copyAndReplace(String destination);
/**
* Copy this object instance to the new instance with a defined name and database, and replace it if it already exists.
*
* @param destination name of the destination instance
* @param database database number
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
Mono<Boolean> copyAndReplace(String destination, int database);
/**
* Transfer a object from a source Redis instance to a destination Redis instance
* in mode

@ -118,7 +118,41 @@ public interface RObjectRx {
* @return void
*/
Completable copy(String host, int port, int database, long timeout);
/**
* Copy this object instance to the new instance with a defined name.
*
* @param destination name of the destination instance
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
Single<Boolean> copy(String destination);
/**
* Copy this object instance to the new instance with a defined name and database.
*
* @param destination name of the destination instance
* @param database database number
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
Single<Boolean> copy(String destination, int database);
/**
* Copy this object instance to the new instance with a defined name, and replace it if it already exists.
*
* @param destination name of the destination instance
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
Single<Boolean> copyAndReplace(String destination);
/**
* Copy this object instance to the new instance with a defined name and database, and replace it if it already exists.
*
* @param destination name of the destination instance
* @param database database number
* @return <code>true</code> if this object instance was copied else <code>false</code>
*/
Single<Boolean> copyAndReplace(String destination, int database);
/**
* Transfer a object from a source Redis instance to a destination Redis instance
* in mode

@ -1098,7 +1098,17 @@ public abstract class BaseMapTest extends RedisDockerTest {
assertThat(map.valueSize("1")).isEqualTo(5);
destroy(map);
}
@Test
public void testCopy() {
RMap<String, String> map = getMap("test");
map.put("1", "2");
map.copy("test2");
RMap<String, String> mapCopy = getMap("test2");
assertThat(mapCopy.get("1")).isEqualTo("2");
}
@Test
public void testGetAllOrder() {
RMap<Integer, Integer> map = getMap("getAll");

@ -49,7 +49,7 @@ public class RedisDockerTest {
}
protected static GenericContainer<?> createRedis(String... params) {
return createRedisWithVersion("redis:7.4-rc1", params);
return createRedisWithVersion("redis:latest", params);
}
static {

@ -629,6 +629,16 @@ public class RedissonBucketTest extends RedisDockerTest {
});
}
@Test
public void testCopy2() {
RBucket<String> bucket = redisson.getBucket("test");
bucket.set("someValue");
bucket.copy("test2");
RBucket<String> bucket2 = redisson.getBucket("test2");
assertThat(bucket2.get()).isEqualTo("someValue");
}
@Test
public void testRename() {
RBucket<String> bucket = redisson.getBucket("test");

Loading…
Cancel
Save