RedissonMapCache bug fixes #1027

pull/1038/merge
Nikita 8 years ago
parent deea34a82f
commit 727c4523cb

@ -86,13 +86,13 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor,
String name, RedissonClient redisson, MapOptions<K, V> options) {
super(commandExecutor, name, redisson, options);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName());
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName(), getLastAccessTimeSetName());
}
public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor,
String name, RedissonClient redisson, MapOptions<K, V> options) {
super(codec, commandExecutor, name, redisson, options);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName());
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName(), getLastAccessTimeSetName());
}
@Override
@ -152,7 +152,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
" return 1;" +
"end;" +
"return 0; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getLastAccessTimeSetNameByKey(key), getOptionsName()),
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getLastAccessTimeSetNameByKey(key), getOptionsName(key)),
System.currentTimeMillis(), encodeMapKey(key));
}
@ -486,7 +486,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
protected RFuture<V> putOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[2]);" +
"local v = redis.call('hget', KEYS[1], ARGV[2]);" +
"local exists = false;" +
"if v ~= false then" +
" local t, val = struct.unpack('dLc0', v);" +
@ -1083,17 +1083,14 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('zrem', KEYS[2], ARGV[2]); "
+ "redis.call('zrem', KEYS[3], ARGV[2]); "
+ "local maxSize = tonumber(redis.call('hget', KEYS[6], 'max-size'));"
+ "if maxSize ~= nil and maxSize ~= 0 then"
+ " redis.call('zrem', KEYS[5], ARGV[2]); "
+ "end;"
+ "redis.call('zrem', KEYS[5], ARGV[2]); "
+ "redis.call('hdel', KEYS[1], ARGV[2]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(val), val); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return val; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelNameByKey(key),
getLastAccessTimeSetNameByKey(key), getOptionsName(key)),
getLastAccessTimeSetNameByKey(key)),
System.currentTimeMillis(), encodeMapKey(key));
}
@ -1601,7 +1598,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public RFuture<Void> putAllOperationAsync(Map<? extends K, ? extends V> map) {
List<Object> params = new ArrayList<Object>(map.size()*2);
List<Object> params = new ArrayList<Object>(map.size()*2 + 1);
params.add(System.currentTimeMillis());
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
if (t.getKey() == null) {
throw new NullPointerException("map key can't be null");
@ -1615,17 +1613,72 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local currentTime = tonumber(table.remove(ARGV, 1)); " + // index is the first parameter
"local maxSize = tonumber(redis.call('hget', KEYS[8], 'max-size'));" +
"for i, value in ipairs(ARGV) do "
+ "if i % 2 == 0 then "
+ "local val = struct.pack('dLc0', 0, string.len(value), value); "
+ "local key = ARGV[i-1];"
+ "redis.call('hmset', KEYS[1], key, val); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value); "
+ "redis.call('publish', KEYS[2], msg); "
+ "if i % 2 == 0 then "
+ "local key = ARGV[i-1];" +
"local v = redis.call('hget', KEYS[1], key);" +
"local exists = false;" +
"if v ~= false then" +
" local t, val = struct.unpack('dLc0', v);" +
" local expireDate = 92233720368547758;" +
" local expireDateScore = redis.call('zscore', KEYS[2], key);" +
" if expireDateScore ~= false then" +
" expireDate = tonumber(expireDateScore)" +
" end;" +
" if t ~= 0 then" +
" local expireIdle = redis.call('zscore', KEYS[3], key);" +
" if expireIdle ~= false then" +
" expireDate = math.min(expireDate, tonumber(expireIdle))" +
" end;" +
" end;" +
" if expireDate > tonumber(currentTime) then" +
" exists = true;" +
" end;" +
"end;" +
"" +
"local newvalue = struct.pack('dLc0', 0, string.len(value), value);" +
"redis.call('hset', KEYS[1], key, newvalue);" +
"local lastAccessTimeSetName = KEYS[6];" +
"if exists == false then" +
" if maxSize ~= nil and maxSize ~= 0 then" +
" redis.call('zadd', lastAccessTimeSetName, currentTime, key);" +
" local cacheSize = tonumber(redis.call('hlen', KEYS[1]));" +
" if cacheSize >= maxSize then" +
" local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize);" +
" for index, lruItem in ipairs(lruItems) do" +
" if lruItem then" +
" local lruItemValue = redis.call('hget', KEYS[1], lruItem);" +
" redis.call('hdel', KEYS[1], lruItem);" +
" redis.call('zrem', KEYS[2], lruItem);" +
" redis.call('zrem', KEYS[3], lruItem);" +
" redis.call('zrem', lastAccessTimeSetName, lruItem);" +
" local removedChannelName = KEYS[7];" +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" +
" redis.call('publish', removedChannelName, msg);" +
" end;" +
" end" +
" end;" +
" end;" +
" local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value);" +
" redis.call('publish', KEYS[4], msg);" +
"else " +
"local t, val = struct.unpack('dLc0', v);" +
"local msg = struct.pack('Lc0Lc0Lc0', string.len(key), key, string.len(value), value, string.len(val), val);" +
"redis.call('publish', KEYS[5], msg);" +
" if maxSize ~= nil and maxSize ~= 0 then " +
" redis.call('zadd', lastAccessTimeSetName, currentTime, key);" +
" end;" +
"end;"
+ "end;"
+ "end;",
Arrays.<Object>asList(getName(), getCreatedChannelName()), params.toArray());
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName(), getCreatedChannelName(),
getUpdatedChannelName(), getLastAccessTimeSetName(), getRemovedChannelName(), getOptionsName()),
params.toArray());
}
@Override
@ -1723,7 +1776,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public RFuture<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local maxSize = tonumber(redis.call('hget', KEYS[5], 'max-size')); " +
"local maxSize = tonumber(redis.call('hget', KEYS[5], 'max-size')); " +
"if maxSize ~= nil and maxSize ~= 0 then " +
" redis.call('pexpire', KEYS[5], ARGV[1]); " +
" redis.call('zadd', KEYS[4], 92233720368547758, 'redisson__expiretag'); " +
@ -1741,12 +1794,19 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public RFuture<Boolean> clearExpireAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('zrem', KEYS[2], 'redisson__expiretag'); " +
"local maxSize = tonumber(redis.call('hget', KEYS[5], 'max-size')); " +
"if maxSize ~= nil and maxSize ~= 0 then " +
" redis.call('persist', KEYS[5]); " +
" redis.call('zrem', KEYS[4], 92233720368547758, 'redisson__expiretag'); " +
" redis.call('persist', KEYS[4]); " +
"end; " +
"redis.call('zrem', KEYS[2], 'redisson__expiretag'); " +
"redis.call('persist', KEYS[2]); " +
"redis.call('zrem', KEYS[3], 'redisson__expiretag'); " +
"redis.call('persist', KEYS[3]); " +
"return redis.call('persist', KEYS[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()));
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName()));
}
@Override

@ -63,8 +63,8 @@ public class EvictionScheduler {
}
}
public void schedule(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName) {
EvictionTask task = new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, expiredChannelName, executor);
public void schedule(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName, String lastAccessTimeSetName) {
EvictionTask task = new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName, executor);
EvictionTask prevTask = tasks.putIfAbsent(name, task);
if (prevTask == null) {
task.schedule();

@ -33,13 +33,16 @@ public class MapCacheEvictionTask extends EvictionTask {
private final String timeoutSetName;
private final String maxIdleSetName;
private final String expiredChannelName;
private final String lastAccessTimeSetName;
public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName, CommandAsyncExecutor executor) {
public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName,
String expiredChannelName, String lastAccessTimeSetName, CommandAsyncExecutor executor) {
super(executor);
this.name = name;
this.timeoutSetName = timeoutSetName;
this.maxIdleSetName = maxIdleSetName;
this.expiredChannelName = expiredChannelName;
this.lastAccessTimeSetName = lastAccessTimeSetName;
}
@Override
@ -55,6 +58,7 @@ public class MapCacheEvictionTask extends EvictionTask {
+ "end;"
+ "end;"
+ "if #expiredKeys1 > 0 then "
+ "redis.call('zrem', KEYS[4], unpack(expiredKeys1)); "
+ "redis.call('zrem', KEYS[3], unpack(expiredKeys1)); "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys1)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys1)); "
@ -69,12 +73,14 @@ public class MapCacheEvictionTask extends EvictionTask {
+ "end;"
+ "end;"
+ "if #expiredKeys2 > 0 then "
+ "redis.call('zrem', KEYS[4], unpack(expiredKeys2)); "
+ "redis.call('zrem', KEYS[3], unpack(expiredKeys2)); "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys2)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys2)); "
+ "end; "
+ "return #expiredKeys1 + #expiredKeys2;",
Arrays.<Object>asList(name, timeoutSetName, maxIdleSetName, expiredChannelName), System.currentTimeMillis(), keysLimit);
Arrays.<Object>asList(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName),
System.currentTimeMillis(), keysLimit);
}
}

Loading…
Cancel
Save