put methods with maxIdleTime param added to RMapCache. #372

pull/382/head
Nikita 9 years ago
parent f356a53a10
commit e97ba8ef26

@ -48,16 +48,18 @@ public class EvictionScheduler {
final String name; final String name;
final String timeoutSetName; final String timeoutSetName;
final String maxIdleSetName;
final Deque<Integer> sizeHistory = new LinkedList<Integer>(); final Deque<Integer> sizeHistory = new LinkedList<Integer>();
int delay = 10; int delay = 10;
int minDelay = 5; final int minDelay = 1;
int maxDelay = 2*60*60; final int maxDelay = 2*60*60;
int keysLimit = 500; final int keysLimit = 300;
public RedissonCacheTask(String name, String timeoutSetName) { public RedissonCacheTask(String name, String timeoutSetName, String maxIdleSetName) {
this.name = name; this.name = name;
this.timeoutSetName = timeoutSetName; this.timeoutSetName = timeoutSetName;
this.maxIdleSetName = maxIdleSetName;
} }
public void schedule() { public void schedule() {
@ -66,7 +68,7 @@ public class EvictionScheduler {
@Override @Override
public void run() { public void run() {
Future<Integer> future = cleanupExpiredEntires(name, timeoutSetName, keysLimit); Future<Integer> future = cleanupExpiredEntires(name, timeoutSetName, maxIdleSetName, keysLimit);
future.addListener(new FutureListener<Integer>() { future.addListener(new FutureListener<Integer>() {
@Override @Override
@ -122,13 +124,20 @@ public class EvictionScheduler {
} }
public void schedule(String name, String timeoutSetName) { public void schedule(String name, String timeoutSetName) {
RedissonCacheTask task = new RedissonCacheTask(name, timeoutSetName); RedissonCacheTask task = new RedissonCacheTask(name, timeoutSetName, null);
RedissonCacheTask prevTask = tasks.putIfAbsent(name, task); RedissonCacheTask prevTask = tasks.putIfAbsent(name, task);
if (prevTask == null) { if (prevTask == null) {
task.schedule(); task.schedule();
} }
} }
public void schedule(String name, String timeoutSetName, String maxIdleSetName) {
RedissonCacheTask task = new RedissonCacheTask(name, timeoutSetName, maxIdleSetName);
RedissonCacheTask prevTask = tasks.putIfAbsent(name, task);
if (prevTask == null) {
task.schedule();
}
}
public void runCleanTask(final String name, String timeoutSetName, long currentDate) { public void runCleanTask(final String name, String timeoutSetName, long currentDate) {
@ -146,7 +155,7 @@ public class EvictionScheduler {
return; return;
} }
Future<Integer> future = cleanupExpiredEntires(name, timeoutSetName, valuesAmountToClean); Future<Integer> future = cleanupExpiredEntires(name, timeoutSetName, null, valuesAmountToClean);
future.addListener(new FutureListener<Integer>() { future.addListener(new FutureListener<Integer>() {
@Override @Override
@ -166,7 +175,24 @@ public class EvictionScheduler {
}); });
} }
private Future<Integer> cleanupExpiredEntires(String name, String timeoutSetName, int keysLimit) { private Future<Integer> cleanupExpiredEntires(String name, String timeoutSetName, String maxIdleSetName, int keysLimit) {
if (maxIdleSetName != null) {
return executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredKeys1 = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredKeys1 > 0 then "
+ "redis.call('zrem', KEYS[3], unpack(expiredKeys1)); "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys1)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys1)); "
+ "end; "
+ "local expiredKeys2 = redis.call('zrangebyscore', KEYS[3], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredKeys2 > 0 then "
+ "redis.call('zrem', KEYS[3], unpack(expiredKeys2)); "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys2)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys2)); "
+ "end; "
+ "return #expiredKeys1 + #expiredKeys2;",
Arrays.<Object>asList(name, timeoutSetName, maxIdleSetName), System.currentTimeMillis(), keysLimit);
}
return executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, return executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredKeys > 0 then " + "if #expiredKeys > 0 then "

@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
@ -47,7 +46,6 @@ import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.core.Predicate; import org.redisson.core.Predicate;
import org.redisson.core.RMap; import org.redisson.core.RMap;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
/** /**
@ -61,11 +59,11 @@ import io.netty.util.concurrent.Future;
*/ */
public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> { public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE); static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)); static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP); static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE; static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
protected RedissonMap(CommandAsyncExecutor commandExecutor, String name) { protected RedissonMap(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name);

@ -31,22 +31,18 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.client.protocol.decoder.TTLMapValueReplayDecoder;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.CacheGetAllDecoder; import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.core.RMapCache; import org.redisson.core.RMapCache;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/** /**
* <p>Map-based cache with ability to set TTL for each entry via * <p>Map-based cache with ability to set TTL for each entry via
@ -69,75 +65,95 @@ import io.netty.util.concurrent.Promise;
*/ */
public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCache<K, V> { public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCache<K, V> {
static final RedisCommand<Boolean> EVAL_HSET = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP);
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
private static final RedisCommand<Void> EVAL_HMSET = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 4, ValueType.MAP);
private static final RedisCommand<MapScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapScanResult<Object, Object>>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP); private static final RedisCommand<MapScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapScanResult<Object, Object>>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.MAP); private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE); private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 9, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_FAST_PUT_TTL = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP, ValueType.MAP_VALUE); private static final RedisCommand<Boolean> EVAL_FAST_PUT_TTL = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 9, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<List<Object>> EVAL_GET_TTL = new RedisCommand<List<Object>>("EVAL", new TTLMapValueReplayDecoder<Object>(), 5, ValueType.MAP_KEY, ValueType.MAP_VALUE); private static final RedisCommand<Object> EVAL_GET_TTL = new RedisCommand<Object>("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_KEY); private static final RedisCommand<Boolean> EVAL_CONTAINS_KEY = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP_KEY);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_VALUE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_VALUE); private static final RedisCommand<Boolean> EVAL_CONTAINS_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY); private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY);
private final EvictionScheduler evictionScheduler;
protected RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) { protected RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name);
this.evictionScheduler = evictionScheduler; evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName());
evictionScheduler.schedule(getName(), getTimeoutSetName());
} }
public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) { public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name);
this.evictionScheduler = evictionScheduler; evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName());
evictionScheduler.schedule(getName(), getTimeoutSetName());
} }
@Override @Override
public Future<Boolean> containsKeyAsync(Object key) { public Future<Boolean> containsKeyAsync(Object key) {
Promise<Boolean> result = newPromise(); return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_KEY,
"local value = redis.call('hget', KEYS[1], ARGV[2]); " +
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_KEY,
"local value = redis.call('hexists', KEYS[1], ARGV[1]); " +
"local expireDate = 92233720368547758; " + "local expireDate = 92233720368547758; " +
"if value == 1 then " + "if value ~= false then " +
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[1]); " "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDateScore ~= false then " + "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) " + "expireDate = tonumber(expireDateScore) "
+ "end; " + + "end; "
+ "local t, val = struct.unpack('dLc0', value); "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], ARGV[2], value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), ARGV[2]); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; "
+ "return 1;" +
"end;" + "end;" +
"return {expireDate, value}; ", "return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), key); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), key);
addExpireListener(result, future, new BooleanReplayConvertor(), false);
return result;
} }
@Override @Override
public Future<Boolean> containsValueAsync(Object value) { public Future<Boolean> containsValueAsync(Object value) {
Promise<Boolean> result = newPromise(); return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_VALUE,
"local s = redis.call('hgetall', KEYS[1]); "
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_VALUE, + "for i, v in ipairs(s) do "
"local s = redis.call('hgetall', KEYS[1]);" + + "if i % 2 == 0 then "
"for i, v in ipairs(s) do " + "local t, val = struct.unpack('dLc0', v); "
+ "if i % 2 == 0 and ARGV[1] == v then " + "if ARGV[2] == val then "
+ "local key = s[i-1];" + "local key = s[i-1];" +
+ "local expireDate = redis.call('zscore', KEYS[2], key); " "local expireDate = 92233720368547758; " +
+ "if expireDate == false then " "local expireDateScore = redis.call('zscore', KEYS[2], key); "
+ "expireDate = 92233720368547758 " + "if expireDateScore ~= false then "
+ "else " + "expireDate = tonumber(expireDateScore) "
+ "expireDate = tonumber(expireDate) " + "end; "
+ "end; " + "if t ~= 0 then "
+ "return {expireDate, 1}; " + "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "end " + "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; "
+ "return 1; "
+ "end; "
+ "end; "
+ "end;" + + "end;" +
"return {92233720368547758, 0};", "return 0;",
Arrays.<Object>asList(getName(), getTimeoutSetName()), value); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), value);
addExpireListener(result, future, new BooleanReplayConvertor(), false);
return result;
} }
@Override @Override
@ -150,131 +166,163 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
args.add(System.currentTimeMillis()); args.add(System.currentTimeMillis());
args.addAll(keys); args.addAll(keys);
final Promise<Map<K, V>> result = newPromise(); return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args), 7, ValueType.MAP_KEY, ValueType.MAP_VALUE),
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<List<Object>>("EVAL", new CacheGetAllDecoder(args), 6, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" + "local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" +
"local expireIdleHead = redis.call('zrange', KEYS[3], 0, 0, 'withscores');" +
"local maxDate = table.remove(ARGV, 1); " // index is the first parameter "local maxDate = table.remove(ARGV, 1); " // index is the first parameter
+ "local minExpireDate = 92233720368547758;" + + "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= tonumber(maxDate); "
"if #expireHead == 2 and tonumber(expireHead[2]) <= tonumber(maxDate) then " + "local hasExpireIdle = #expireIdleHead == 2 and tonumber(expireIdleHead[2]) <= tonumber(maxDate); "
+ "for i, key in pairs(ARGV) do " + "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); "
+ "local expireDate = redis.call('zscore', KEYS[2], key); " + "for i = #map, 1, -1 do "
+ "if expireDate ~= false and tonumber(expireDate) <= tonumber(maxDate) then " + "local value = map[i]; "
+ "minExpireDate = math.min(tonumber(expireDate), minExpireDate); " + "if value ~= false then "
+ "ARGV[i] = ARGV[i] .. '__redisson__skip' " + "local key = ARGV[i]; "
+ "end;" + "local t, val = struct.unpack('dLc0', value); "
+ "end;" + "map[i] = val; "
+ "end; " +
"return {minExpireDate, unpack(redis.call('hmget', KEYS[1], unpack(ARGV)))};", + "if hasExpire then "
Arrays.<Object>asList(getName(), getTimeoutSetName()), args.toArray()); + "local expireDate = redis.call('zscore', KEYS[2], key); "
+ "if expireDate ~= false and tonumber(expireDate) <= tonumber(maxDate) then "
future.addListener(new FutureListener<List<Object>>() { + "map[i] = false; "
@Override + "end; "
public void operationComplete(Future<List<Object>> future) throws Exception { + "end; "
if (!future.isSuccess()) {
result.setFailure(future.cause()); + "if hasExpireIdle and t ~= 0 then "
return; + "local expireIdle = redis.call('zscore', KEYS[3], key); "
} + "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "else "
+ "map[i] = false; "
+ "end; "
+ "end; "
+ "end; "
+ "end; "
+ "end; "
+ "return map;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), args.toArray());
List<Object> res = future.getNow(); }
Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
}
result.setSuccess((Map<K, V>) res.get(1));
}
});
return result; @Override
public V putIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit) {
return get(putIfAbsentAsync(key, value, ttl, ttlUnit));
}
@Override
public Future<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit) {
return putIfAbsentAsync(key, value, ttl, ttlUnit, 0, null);
} }
@Override @Override
public V putIfAbsent(K key, V value, long ttl, TimeUnit unit) { public V putIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
return get(putIfAbsentAsync(key, value, ttl, unit)); return get(putIfAbsentAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit));
} }
@Override @Override
public Future<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit unit) { public Future<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
if (ttl < 0) { if (ttl < 0) {
throw new IllegalArgumentException("TTL can't be negative"); throw new IllegalArgumentException("ttl can't be negative");
}
if (maxIdleTime < 0) {
throw new IllegalArgumentException("maxIdleTime can't be negative");
} }
if (ttl == 0) { if (ttl == 0 && maxIdleTime == 0) {
return putIfAbsentAsync(key, value); return putIfAbsentAsync(key, value);
} }
if (unit == null) { if (ttl > 0 && ttlUnit == null) {
throw new NullPointerException("TimeUnit param can't be null"); throw new NullPointerException("ttlUnit param can't be null");
}
if (maxIdleTime > 0 && maxIdleUnit == null) {
throw new NullPointerException("maxIdleUnit param can't be null");
}
long ttlTimeout = 0;
if (ttl > 0) {
ttlTimeout = System.currentTimeMillis() + ttlUnit.toMillis(ttl);
}
long maxIdleTimeout = 0;
long maxIdleDelta = 0;
if (maxIdleTime > 0) {
maxIdleDelta = maxIdleUnit.toMillis(maxIdleTime);
maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta;
} }
long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_TTL, return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_TTL,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 0 then " "if redis.call('hexists', KEYS[1], ARGV[4]) == 0 then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); "
+ "return nil " + "end; "
+ "if tonumber(ARGV[2]) > 0 then "
+ "redis.call('zadd', KEYS[3], ARGV[2], ARGV[4]); "
+ "end; "
+ "local value = struct.pack('dLc0', ARGV[3], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('hset', KEYS[1], ARGV[4], value); "
+ "return nil; "
+ "else " + "else "
+ "return redis.call('hget', KEYS[1], ARGV[2]) " + "local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "if value == false then "
+ "return nil; "
+ "end;"
+ "local t, val = struct.unpack('dLc0', value); "
+ "return val; "
+ "end", + "end",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeoutDate, key, value); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
} }
@Override @Override
public Future<Long> removeAsync(Object key, Object value) { public Future<Long> removeAsync(Object key, Object value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUE, return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " "local value = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('zrem', KEYS[2], ARGV[1]); " + "if value == false then "
+ "return redis.call('hdel', KEYS[1], ARGV[1]); " + "return 0; "
+ "end;"
+ "local t, val = struct.unpack('dLc0', value); "
+ "if val == ARGV[2] then "
+ "redis.call('zrem', KEYS[2], ARGV[1]); "
+ "redis.call('zrem', KEYS[3], ARGV[1]); "
+ "return redis.call('hdel', KEYS[1], ARGV[1]); "
+ "else " + "else "
+ "return 0 " + "return 0 "
+ "end", + "end",
Arrays.<Object>asList(getName(), getTimeoutSetName()), key, value); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), key, value);
} }
@Override @Override
public Future<V> getAsync(K key) { public Future<V> getAsync(K key) {
Promise<V> result = newPromise(); return commandExecutor.evalReadAsync(getName(), codec, EVAL_GET_TTL,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_GET_TTL, + "if value == false then "
"local value = redis.call('hget', KEYS[1], ARGV[1]); " + + "return nil; "
"local expireDate = redis.call('zscore', KEYS[2], ARGV[1]); " + "end; "
+ "if expireDate == false then " + "local t, val = struct.unpack('dLc0', value); "
+ "expireDate = 92233720368547758; " + "local expireDate = 92233720368547758; " +
+ "end; " + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
"return {expireDate, value}; ", + "if expireDateScore ~= false then "
Arrays.<Object>asList(getName(), getTimeoutSetName()), key); + "expireDate = tonumber(expireDateScore) "
+ "end; "
addExpireListener(result, future, null, null); + "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); "
return result; + "if expireIdle ~= false then "
} + "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
private <T> void addExpireListener(final Promise<T> result, Future<List<Object>> future, final Convertor<T> convertor, final T nullValue) { + "redis.call('hset', KEYS[1], ARGV[2], value); "
future.addListener(new FutureListener<List<Object>>() { + "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), ARGV[2]); "
@Override + "end; "
public void operationComplete(Future<List<Object>> future) throws Exception { + "expireDate = math.min(expireDate, tonumber(expireIdle)) "
if (!future.isSuccess()) { + "end; "
result.setFailure(future.cause()); + "end; "
return; + "if expireDate <= tonumber(ARGV[1]) then "
} + "return nil; "
+ "end; "
List<Object> res = future.getNow(); + "return val; ",
Long expireDate = (Long) res.get(0); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), key);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
result.setSuccess(nullValue);
evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
return;
}
if (convertor != null) {
result.setSuccess((T) convertor.convert(res.get(1)));
} else {
result.setSuccess((T) res.get(1));
}
}
});
} }
@Override @Override
@ -283,65 +331,184 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
} }
@Override @Override
public boolean fastPut(K key, V value, long ttl, TimeUnit unit) { public Future<V> putAsync(K key, V value) {
return get(fastPutAsync(key, value, ttl, unit)); return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "redis.call('hset', KEYS[1], ARGV[1], value); "
+ "if v == false then "
+ "return nil; "
+ "end; "
+ "local t, val = struct.unpack('dLc0', v); "
+ "return val; ",
Collections.<Object>singletonList(getName()), key, value);
} }
@Override @Override
public Future<Boolean> fastPutAsync(K key, V value, long ttl, TimeUnit unit) { public Future<V> putIfAbsentAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "if redis.call('hsetnx', KEYS[1], ARGV[1], value) == 1 then "
+ "return nil "
+ "else "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if v == false then "
+ "return nil; "
+ "end; "
+ "local t, val = struct.unpack('dLc0', v); "
+ "return val; "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
}
@Override
public boolean fastPut(K key, V value, long ttl, TimeUnit ttlUnit) {
return get(fastPutAsync(key, value, ttl, ttlUnit));
}
@Override
public Future<Boolean> fastPutAsync(K key, V value, long ttl, TimeUnit ttlUnit) {
return fastPutAsync(key, value, ttl, ttlUnit, 0, null);
}
@Override
public boolean fastPut(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
return get(fastPutAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit));
}
@Override
public Future<Boolean> fastPutAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
if (ttl < 0) { if (ttl < 0) {
throw new IllegalArgumentException("TTL can't be negative"); throw new IllegalArgumentException("ttl can't be negative");
}
if (maxIdleTime < 0) {
throw new IllegalArgumentException("maxIdleTime can't be negative");
} }
if (ttl == 0) { if (ttl == 0 && maxIdleTime == 0) {
return fastPutAsync(key, value); return fastPutAsync(key, value);
} }
if (unit == null) { if (ttl > 0 && ttlUnit == null) {
throw new NullPointerException("TimeUnit param can't be null"); throw new NullPointerException("ttlUnit param can't be null");
}
if (maxIdleTime > 0 && maxIdleUnit == null) {
throw new NullPointerException("maxIdleUnit param can't be null");
}
long ttlTimeout = 0;
if (ttl > 0) {
ttlTimeout = System.currentTimeMillis() + ttlUnit.toMillis(ttl);
}
long maxIdleTimeout = 0;
long maxIdleDelta = 0;
if (maxIdleTime > 0) {
maxIdleDelta = maxIdleUnit.toMillis(maxIdleTime);
maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta;
} }
long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_PUT_TTL, return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_PUT_TTL,
"redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + "if tonumber(ARGV[1]) > 0 then "
"return redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); ", + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); "
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeoutDate, key, value); + "else "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "end; "
+ "if tonumber(ARGV[2]) > 0 then "
+ "redis.call('zadd', KEYS[3], ARGV[2], ARGV[4]); "
+ "else "
+ "redis.call('zrem', KEYS[3], ARGV[4]); "
+ "end; "
+ "local value = struct.pack('dLc0', ARGV[3], string.len(ARGV[5]), ARGV[5]); " +
"return redis.call('hset', KEYS[1], ARGV[4], value); ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
} }
@Override @Override
public Future<V> putAsync(K key, V value, long ttl, TimeUnit unit) { public Future<V> putAsync(K key, V value, long ttl, TimeUnit ttlUnit) {
return putAsync(key, value, ttl, ttlUnit, 0, null);
}
@Override
public V put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
return get(putAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit));
}
@Override
public Future<V> putAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
if (ttl < 0) { if (ttl < 0) {
throw new IllegalArgumentException("TTL can't be negative"); throw new IllegalArgumentException("ttl can't be negative");
}
if (maxIdleTime < 0) {
throw new IllegalArgumentException("maxIdleTime can't be negative");
} }
if (ttl == 0) { if (ttl == 0 && maxIdleTime == 0) {
return putAsync(key, value); return putAsync(key, value);
} }
if (unit == null) { if (ttl > 0 && ttlUnit == null) {
throw new NullPointerException("TimeUnit param can't be null"); throw new NullPointerException("ttlUnit param can't be null");
}
if (maxIdleTime > 0 && maxIdleUnit == null) {
throw new NullPointerException("maxIdleUnit param can't be null");
}
long ttlTimeout = 0;
if (ttl > 0) {
ttlTimeout = System.currentTimeMillis() + ttlUnit.toMillis(ttl);
}
long maxIdleTimeout = 0;
long maxIdleDelta = 0;
if (maxIdleTime > 0) {
maxIdleDelta = maxIdleUnit.toMillis(maxIdleTime);
maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta;
} }
long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_TTL, return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_TTL,
"local v = redis.call('hget', KEYS[1], ARGV[2]); " "local v = redis.call('hget', KEYS[1], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); "
+ "return v", + "else "
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeoutDate, key, value); + "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "end; "
+ "if tonumber(ARGV[2]) > 0 then "
+ "redis.call('zadd', KEYS[3], ARGV[2], ARGV[4]); "
+ "else "
+ "redis.call('zrem', KEYS[3], ARGV[4]); "
+ "end; "
+ "local value = struct.pack('dLc0', ARGV[3], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('hset', KEYS[1], ARGV[4], value); "
+ "if v == false then "
+ "return nil;"
+ "end; "
+ "local t, val = struct.unpack('dLc0', v); "
+ "return val",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
} }
String getTimeoutSetName() { String getTimeoutSetName() {
return "redisson__timeout__set__{" + getName() + "}"; return "redisson__timeout__set__{" + getName() + "}";
} }
String getIdleSetName() {
return "redisson__idle__set__{" + getName() + "}";
}
@Override @Override
public Future<V> removeAsync(K key) { public Future<V> removeAsync(K key) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE, return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); " "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('zrem', KEYS[2], ARGV[1]); " + "redis.call('zrem', KEYS[2], ARGV[1]); "
+ "redis.call('zrem', KEYS[3], ARGV[1]); "
+ "redis.call('hdel', KEYS[1], ARGV[1]); " + "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "return val; "
+ "end; "
+ "return v", + "return v",
Arrays.<Object>asList(getName(), getTimeoutSetName()), key); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), key);
} }
@Override @Override
@ -351,31 +518,141 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
} }
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_REMOVE, return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_REMOVE,
"redis.call('zrem', KEYS[2], unpack(ARGV)); " "redis.call('zrem', KEYS[3], unpack(ARGV)); " +
+ "return redis.call('hdel', KEYS[1], unpack(ARGV)); ", "redis.call('zrem', KEYS[2], unpack(ARGV)); " +
Arrays.<Object>asList(getName(), getTimeoutSetName()), keys); "return redis.call('hdel', KEYS[1], unpack(ARGV)); ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), keys);
} }
@Override @Override
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) { MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), EVAL_HSCAN, Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), EVAL_HSCAN,
"local result = {}; " "local result = {}; "
+ "local res = redis.call('hscan', KEYS[1], ARGV[1]); " + "local res = redis.call('hscan', KEYS[1], ARGV[2]); "
+ "for i, value in ipairs(res[2]) do " + "for i, value in ipairs(res[2]) do "
+ "if i % 2 == 0 then " + "if i % 2 == 0 then "
+ "local key = res[2][i-1]; " + "local key = res[2][i-1]; " +
+ "local expireDate = redis.call('zscore', KEYS[2], key); " "local expireDate = 92233720368547758; " +
+ "if (expireDate == false) or (expireDate ~= false and tonumber(expireDate) > tonumber(ARGV[2])) then " "local expireDateScore = redis.call('zscore', KEYS[2], key); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "local t, val = struct.unpack('dLc0', value); "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then "
+ "table.insert(result, key); " + "table.insert(result, key); "
+ "table.insert(result, value); " + "table.insert(result, val); "
+ "end; " + "end; "
+ "end; " + "end; "
+ "end;" + "end;"
+ "return {res[1], result};", Arrays.<Object>asList(getName(), getTimeoutSetName()), startPos, System.currentTimeMillis()); + "return {res[1], result};", Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), startPos);
return get(f); return get(f);
} }
@Override
public Future<Boolean> fastPutAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HSET,
"local val = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "return redis.call('hset', KEYS[1], ARGV[1], val); ",
Collections.<Object>singletonList(getName()), key, value);
}
@Override
public Future<Boolean> fastPutIfAbsentAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HSET,
"local val = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "return redis.call('hsetnx', KEYS[1], ARGV[1], val); ",
Collections.<Object>singletonList(getName()), key, value);
}
@Override
public Future<Boolean> replaceAsync(K key, V oldValue, V newValue) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if v == false then "
+ "return 0;"
+ "end;"
+ "local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "local t, val = struct.unpack('dLc0', v); "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], ARGV[2], value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), ARGV[2]); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) and val == ARGV[3] then "
+ "local value = struct.pack('dLc0', t, string.len(ARGV[4]), ARGV[4]); "
+ "redis.call('hset', KEYS[1], ARGV[2], value); "
+ "return 1; "
+ "end; "
+ "return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), key, oldValue, newValue);
}
@Override
public Future<V> replaceAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE,
"local v = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "if t ~= 0 then "
+ "t = t + tonumber(ARGV[1]); "
+ "end; "
+ "local value = struct.pack('dLc0', t, string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[2], value); "
+ "return val; "
+ "else "
+ "return nil; "
+ "end",
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), key, value);
}
@Override
public Future<Void> putAllAsync(Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return newSucceededFuture(null);
}
List<Object> params = new ArrayList<Object>(map.size()*2);
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
params.add(t.getKey());
params.add(t.getValue());
}
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HMSET,
"for i, value in ipairs(ARGV) do "
+ "if i % 2 == 0 then "
+ "local val = struct.pack('dLc0', 0, string.len(value), value); "
+ "ARGV[i] = val; "
+ "end;"
+ "end;"
+ "return redis.call('hmset', KEYS[1], unpack(ARGV)); ",
Collections.<Object>singletonList(getName()), params.toArray());
}
@Override @Override
public Future<Boolean> deleteAsync() { public Future<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName());
@ -386,8 +663,10 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" + "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" +
"redis.call('pexpire', KEYS[2], ARGV[1]); " + "redis.call('pexpire', KEYS[2], ARGV[1]); " +
"redis.call('zadd', KEYS[3], 92233720368547758, 'redisson__expiretag');" +
"redis.call('pexpire', KEYS[3], ARGV[1]); " +
"return redis.call('pexpire', KEYS[1], ARGV[1]); ", "return redis.call('pexpire', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeUnit.toMillis(timeToLive)); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), timeUnit.toMillis(timeToLive));
} }
@Override @Override
@ -395,8 +674,10 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" + "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" +
"redis.call('pexpireat', KEYS[2], ARGV[1]); " + "redis.call('pexpireat', KEYS[2], ARGV[1]); " +
"redis.call('zadd', KEYS[3], 92233720368547758, 'redisson__expiretag');" +
"redis.call('pexpire', KEYS[3], ARGV[1]); " +
"return redis.call('pexpireat', KEYS[1], ARGV[1]); ", "return redis.call('pexpireat', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timestamp); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), timestamp);
} }
@Override @Override
@ -404,8 +685,10 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zrem', KEYS[2], 'redisson__expiretag'); " + "redis.call('zrem', KEYS[2], 'redisson__expiretag'); " +
"redis.call('persist', KEYS[2]); " + "redis.call('persist', KEYS[2]); " +
"redis.call('zrem', KEYS[3], 'redisson__expiretag'); " +
"redis.call('persist', KEYS[3]); " +
"return redis.call('persist', KEYS[1]); ", "return redis.call('persist', KEYS[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName())); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()));
} }
} }

@ -44,6 +44,7 @@ public interface RMapCache<K, V> extends RMap<K, V>, RMapCacheAsync<K, V> {
* <p/> * <p/>
* Stores value mapped by key with specified time to live. * Stores value mapped by key with specified time to live.
* Entry expires after specified time to live. * Entry expires after specified time to live.
* <p/>
* If the map previously contained a mapping for * If the map previously contained a mapping for
* the key, the old value is replaced by the specified value. * the key, the old value is replaced by the specified value.
* *
@ -51,14 +52,41 @@ public interface RMapCache<K, V> extends RMap<K, V>, RMapCacheAsync<K, V> {
* @param value * @param value
* @param ttl - time to live for key\value entry. * @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely. * If <code>0</code> then stores infinitely.
* @param unit * @param ttlUnit
* @return previous associated value * @return previous associated value
*/ */
V putIfAbsent(K key, V value, long ttl, TimeUnit unit); V putIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit);
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* <p/>
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p/>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key
* @param value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit
* <p/>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return previous associated value
*/
V putIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/** /**
* Stores value mapped by key with specified time to live. * Stores value mapped by key with specified time to live.
* Entry expires after specified time to live. * Entry expires after specified time to live.
* <p/>
* If the map previously contained a mapping for * If the map previously contained a mapping for
* the key, the old value is replaced by the specified value. * the key, the old value is replaced by the specified value.
* *
@ -71,7 +99,73 @@ public interface RMapCache<K, V> extends RMap<K, V>, RMapCacheAsync<K, V> {
*/ */
V put(K key, V value, long ttl, TimeUnit unit); V put(K key, V value, long ttl, TimeUnit unit);
boolean fastPut(K key, V value, long ttl, TimeUnit unit); /**
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p/>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key
* @param value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit
* <p/>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return previous associated value
*/
V put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* Stores value mapped by key with specified time to live.
* Entry expires after specified time to live.
* <p/>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
* <p/>
* Works faster than usual {@link #put(Object, Object, long, TimeUnit)}
* as it not returns previous value.
*
* @param key
* @param value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit
* @return previous associated value
*/
boolean fastPut(K key, V value, long ttl, TimeUnit ttlUnit);
/**
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p/>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
* <p/>
* Works faster than usual {@link #put(Object, Object, long, TimeUnit, long, TimeUnit)}
* as it not returns previous value.
*
* @param key
* @param value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit
* <p/>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
* @return previous associated value
*/
boolean fastPut(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/** /**
* Returns the number of entries in cache. * Returns the number of entries in cache.

@ -58,6 +58,32 @@ public interface RMapCacheAsync<K, V> extends RMapAsync<K, V> {
*/ */
Future<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit unit); Future<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit unit);
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* <p/>
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p/>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key
* @param value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit
* <p/>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return previous associated value
*/
Future<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/** /**
* Stores value mapped by key with specified time to live. * Stores value mapped by key with specified time to live.
* Entry expires after specified time to live. * Entry expires after specified time to live.
@ -73,8 +99,74 @@ public interface RMapCacheAsync<K, V> extends RMapAsync<K, V> {
*/ */
Future<V> putAsync(K key, V value, long ttl, TimeUnit unit); Future<V> putAsync(K key, V value, long ttl, TimeUnit unit);
/**
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p/>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key
* @param value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit
* <p/>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return previous associated value
*/
Future<V> putAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* Stores value mapped by key with specified time to live.
* Entry expires after specified time to live.
* <p/>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
* <p/>
* Works faster than usual {@link #put(Object, Object, long, TimeUnit)}
* as it not returns previous value.
*
* @param key
* @param value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit
* @return previous associated value
*/
Future<Boolean> fastPutAsync(K key, V value, long ttl, TimeUnit unit); Future<Boolean> fastPutAsync(K key, V value, long ttl, TimeUnit unit);
/**
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p/>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
* <p/>
* Works faster than usual {@link #put(Object, Object, long, TimeUnit, long, TimeUnit)}
* as it not returns previous value.
*
* @param key
* @param value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit
* <p/>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
* @return previous associated value
*/
Future<Boolean> fastPutAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/** /**
* Returns the number of entries in cache. * Returns the number of entries in cache.
* This number can reflects expired entries too * This number can reflects expired entries too

@ -144,8 +144,6 @@ public class RedissonMapCacheTest extends BaseTest {
Map<Integer, Integer> filteredAgain = map.getAll(new HashSet<Integer>(Arrays.asList(2, 3, 5))); Map<Integer, Integer> filteredAgain = map.getAll(new HashSet<Integer>(Arrays.asList(2, 3, 5)));
Assert.assertTrue(filteredAgain.isEmpty()); Assert.assertTrue(filteredAgain.isEmpty());
Thread.sleep(100);
Assert.assertEquals(2, map.size());
} }
@Test @Test
@ -273,8 +271,6 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertFalse(map.keySet().contains(new SimpleKey("33"))); Assert.assertFalse(map.keySet().contains(new SimpleKey("33")));
Assert.assertFalse(map.keySet().contains(new SimpleKey("44"))); Assert.assertFalse(map.keySet().contains(new SimpleKey("44")));
Assert.assertTrue(map.keySet().contains(new SimpleKey("1"))); Assert.assertTrue(map.keySet().contains(new SimpleKey("1")));
Thread.sleep(50);
Assert.assertEquals(1, map.size());
} }
@Test @Test
@ -292,7 +288,6 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertFalse(map.values().contains(new SimpleValue("44"))); Assert.assertFalse(map.values().contains(new SimpleValue("44")));
Assert.assertFalse(map.values().contains(new SimpleValue("33"))); Assert.assertFalse(map.values().contains(new SimpleValue("33")));
Assert.assertTrue(map.values().contains(new SimpleValue("2"))); Assert.assertTrue(map.values().contains(new SimpleValue("2")));
Assert.assertEquals(1, map.size());
} }
@Test @Test
@ -307,8 +302,6 @@ public class RedissonMapCacheTest extends BaseTest {
Thread.sleep(1000); Thread.sleep(1000);
Assert.assertFalse(map.containsValue(new SimpleValue("44"))); Assert.assertFalse(map.containsValue(new SimpleValue("44")));
Thread.sleep(50);
Assert.assertEquals(0, map.size());
} }
@Test @Test
@ -322,8 +315,6 @@ public class RedissonMapCacheTest extends BaseTest {
Thread.sleep(1000); Thread.sleep(1000);
Assert.assertFalse(map.containsKey(new SimpleKey("33"))); Assert.assertFalse(map.containsKey(new SimpleKey("33")));
Thread.sleep(50);
Assert.assertEquals(0, map.size());
} }
@Test @Test
@ -417,6 +408,10 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertNull(map.get(new SimpleKey("33"))); Assert.assertNull(map.get(new SimpleKey("33")));
map.put(new SimpleKey("33"), new SimpleValue("44"), 5, TimeUnit.SECONDS); map.put(new SimpleKey("33"), new SimpleValue("44"), 5, TimeUnit.SECONDS);
map.put(new SimpleKey("10"), new SimpleValue("32"), 5, TimeUnit.SECONDS, 2, TimeUnit.SECONDS);
map.put(new SimpleKey("01"), new SimpleValue("92"), 0, null, 2, TimeUnit.SECONDS);
Assert.assertEquals(3, map.size());
Thread.sleep(11000); Thread.sleep(11000);
@ -444,8 +439,6 @@ public class RedissonMapCacheTest extends BaseTest {
Thread.sleep(1000); Thread.sleep(1000);
Assert.assertNull(map.get(new SimpleKey("33"))); Assert.assertNull(map.get(new SimpleKey("33")));
Thread.sleep(50);
Assert.assertEquals(0, map.size());
} }
@Test @Test
@ -582,6 +575,22 @@ public class RedissonMapCacheTest extends BaseTest {
// Assert.assertEquals(0, values.size()); // Assert.assertEquals(0, values.size());
// } // }
@Test
public void testFastPutIfAbsent() throws Exception {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");
SimpleKey key = new SimpleKey("1");
SimpleValue value = new SimpleValue("2");
map.put(key, value);
assertThat(map.fastPutIfAbsent(key, new SimpleValue("3"))).isFalse();
assertThat(map.get(key)).isEqualTo(value);
SimpleKey key1 = new SimpleKey("2");
SimpleValue value1 = new SimpleValue("4");
assertThat(map.fastPutIfAbsent(key1, value1)).isTrue();
assertThat(map.get(key1)).isEqualTo(value1);
}
@Test @Test
public void testFastPut() throws Exception { public void testFastPut() throws Exception {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple"); RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
@ -590,6 +599,54 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertEquals(1, map.size()); Assert.assertEquals(1, map.size());
} }
@Test
public void testPutIdle() throws InterruptedException {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
map.put(1, 2, 0, null, 2, TimeUnit.SECONDS);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(2100);
assertThat(map.get(1)).isNull();
}
@Test
public void testPutIfAbsentIdle() throws InterruptedException {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
assertThat(map.putIfAbsent(1, 2, 0, null, 2, TimeUnit.SECONDS)).isNull();
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(2100);
assertThat(map.get(1)).isNull();
}
@Test
public void testFastPutIdle() throws InterruptedException {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
assertThat(map.fastPut(1, 2, 0, null, 2, TimeUnit.SECONDS)).isTrue();
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(1000);
assertThat(map.get(1)).isEqualTo(2);
Thread.sleep(2100);
assertThat(map.get(1)).isNull();
}
@Test @Test
public void testFastPutWithTTL() throws Exception { public void testFastPutWithTTL() throws Exception {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple"); RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
@ -598,6 +655,14 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertEquals(1, map.size()); Assert.assertEquals(1, map.size());
} }
@Test
public void testFastPutWithTTLandMaxIdle() throws Exception {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
Assert.assertTrue(map.fastPut(1, 2, 200, TimeUnit.SECONDS, 100, TimeUnit.SECONDS));
Assert.assertFalse(map.fastPut(1, 2, 200, TimeUnit.SECONDS, 100, TimeUnit.SECONDS));
Assert.assertEquals(1, map.size());
}
@Test @Test
public void testEquals() { public void testEquals() {
RMapCache<String, String> map = redisson.getMapCache("simple"); RMapCache<String, String> map = redisson.getMapCache("simple");

Loading…
Cancel
Save