RedissonMapCache cluster support improvements

pull/872/merge
Nikita 8 years ago
parent dac88df5e2
commit 0fe6a42c0c

@ -85,7 +85,7 @@ import io.netty.util.concurrent.FutureListener;
public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCache<K, V> { public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCache<K, V> {
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 7, ValueType.MAP, ValueType.MAP_VALUE); static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 7, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Void> EVAL_HMSET = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 4, ValueType.MAP); static final RedisCommand<Void> EVAL_HMSET = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE); private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 12, ValueType.MAP, ValueType.MAP_VALUE); private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 12, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_PUT_TTL_IF_ABSENT = new RedisCommand<Object>("EVAL", 11, ValueType.MAP, ValueType.MAP_VALUE); private static final RedisCommand<Object> EVAL_PUT_TTL_IF_ABSENT = new RedisCommand<Object>("EVAL", 11, ValueType.MAP, ValueType.MAP_VALUE);
@ -111,7 +111,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
super(codec, commandExecutor, name, redisson); super(codec, commandExecutor, name, redisson);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName()); evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName());
} }
@Override @Override
public RFuture<Boolean> containsKeyAsync(Object key) { public RFuture<Boolean> containsKeyAsync(Object key) {
checkKey(key); checkKey(key);
@ -185,7 +185,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
} }
@Override @Override
public RFuture<Map<K, V>> getAllAsync(Set<K> keys) { public RFuture<Map<K, V>> getAllAsync(final Set<K> keys) {
if (keys.isEmpty()) { if (keys.isEmpty()) {
return newSucceededFuture(Collections.<K, V>emptyMap()); return newSucceededFuture(Collections.<K, V>emptyMap());
} }
@ -230,7 +230,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; " + "end; "
+ "return map;", + "return map;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), args.toArray()); Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), args.toArray());
} }
@Override @Override
@ -289,22 +289,22 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "if value == false then " + "if value == false then "
+ "insertable = true; " + "insertable = true; "
+ "else " + "else "
+ "local t, val = struct.unpack('dLc0', value); " + "local t, val = struct.unpack('dLc0', value); "
+ "local expireDate = 92233720368547758; " + "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); " + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); "
+ "if expireDateScore ~= false then " + "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) " + "expireDate = tonumber(expireDateScore) "
+ "end; " + "end; "
+ "if t ~= 0 then " + "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); " + "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); "
+ "if expireIdle ~= false then " + "if expireIdle ~= false then "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) " + "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; " + "end; "
+ "end; " + "end; "
+ "if expireDate <= tonumber(ARGV[1]) then " + "if expireDate <= tonumber(ARGV[1]) then "
+ "insertable = true; " + "insertable = true; "
+ "end; " + "end; "
+ "end; " + "end; "
+ "if insertable == true then " + "if insertable == true then "
// ttl // ttl
@ -334,7 +334,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('zadd', KEYS[3], t + ARGV[1], ARGV[5]); " + "redis.call('zadd', KEYS[3], t + ARGV[1], ARGV[5]); "
+ "return val;" + "return val;"
+ "end; ", + "end; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
} }
@ -359,7 +359,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "else " + "else "
+ "return 0; " + "return 0; "
+ "end", + "end",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelNameByKey(key)),
encodeMapKey(key), encodeMapValue(value)); encodeMapKey(key), encodeMapValue(value));
} }
@ -419,7 +419,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2], string.len(val), val); " + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2], string.len(val), val); "
+ "redis.call('publish', KEYS[3], msg); " + "redis.call('publish', KEYS[3], msg); "
+ "return val; ", + "return val; ",
Arrays.<Object>asList(getName(key), getCreatedChannelName(), getUpdatedChannelName()), Arrays.<Object>asList(getName(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
key, value); key, value);
} }
@ -442,7 +442,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "local t, val = struct.unpack('dLc0', v); " + "local t, val = struct.unpack('dLc0', v); "
+ "return val; " + "return val; "
+ "end", + "end",
Arrays.<Object>asList(getName(key), getCreatedChannelName()), Arrays.<Object>asList(getName(key), getCreatedChannelNameByKey(key)),
key, value); key, value);
} }
@ -503,7 +503,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "local newValuePack = struct.pack('dLc0', t + tonumber(ARGV[1]), string.len(newValue), newValue); " + "local newValuePack = struct.pack('dLc0', t + tonumber(ARGV[1]), string.len(newValue), newValue); "
+ "redis.call('hset', KEYS[1], ARGV[2], newValuePack); " + "redis.call('hset', KEYS[1], ARGV[2], newValuePack); "
+ "return tostring(newValue); ", + "return tostring(newValue); ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), keyState, valueState); System.currentTimeMillis(), keyState, valueState);
} }
@ -563,13 +563,13 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "local t, val;" + "local t, val;"
+ "if value == false then " + "if value == false then "
+ "insertable = true; " + "insertable = true; "
+ "else " + "else "
+ "t, val = struct.unpack('dLc0', value); " + "t, val = struct.unpack('dLc0', value); "
+ "local expireDate = 92233720368547758; " + "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); " + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); "
+ "if expireDateScore ~= false then " + "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) " + "expireDate = tonumber(expireDateScore) "
+ "end; " + "end; "
+ "if t ~= 0 then " + "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); " + "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); "
+ "if expireIdle ~= false then " + "if expireIdle ~= false then "
@ -602,7 +602,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('publish', KEYS[5], msg); " + "redis.call('publish', KEYS[5], msg); "
+ "return 0;" + "return 0;"
+ "end;", + "end;",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value)); System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
} }
@ -656,13 +656,13 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "local v = redis.call('hget', KEYS[1], ARGV[5]); " + "local v = redis.call('hget', KEYS[1], ARGV[5]); "
+ "if v == false then " + "if v == false then "
+ "insertable = true; " + "insertable = true; "
+ "else " + "else "
+ "local t, val = struct.unpack('dLc0', v); " + "local t, val = struct.unpack('dLc0', v); "
+ "local expireDate = 92233720368547758; " + "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); " + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); "
+ "if expireDateScore ~= false then " + "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) " + "expireDate = tonumber(expireDateScore) "
+ "end; " + "end; "
+ "if t ~= 0 then " + "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); " + "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); "
+ "if expireIdle ~= false then " + "if expireIdle ~= false then "
@ -700,7 +700,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('publish', KEYS[5], msg); " + "redis.call('publish', KEYS[5], msg); "
+ "return val", + "return val",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
} }
@ -728,21 +728,33 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return prefixName("redisson__idle__set", getName()); return prefixName("redisson__idle__set", getName());
} }
String getCreatedChannelNameByKey(Object key) {
return prefixName("redisson_map_cache_created", getName(key));
}
String getCreatedChannelName(String name) { String getCreatedChannelName(String name) {
return prefixName("redisson_map_cache_created", name); return prefixName("redisson_map_cache_created", name);
} }
String getCreatedChannelName() { String getCreatedChannelName() {
return prefixName("redisson_map_cache_created", getName()); return prefixName("redisson_map_cache_created", getName());
} }
String getUpdatedChannelName(String name) { String getUpdatedChannelNameByKey(Object key) {
return prefixName("redisson_map_cache_updated", name); return prefixName("redisson_map_cache_updated", getName(key));
} }
String getUpdatedChannelName() { String getUpdatedChannelName() {
return prefixName("redisson_map_cache_updated", getName()); return prefixName("redisson_map_cache_updated", getName());
} }
String getUpdatedChannelName(String name) {
return prefixName("redisson_map_cache_updated", name);
}
String getExpiredChannelNameByKey(Object key) {
return prefixName("redisson_map_cache_expired", getName(key));
}
String getExpiredChannelName(String name) { String getExpiredChannelName(String name) {
return prefixName("redisson_map_cache_expired", name); return prefixName("redisson_map_cache_expired", name);
@ -752,13 +764,17 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return prefixName("redisson_map_cache_expired", getName()); return prefixName("redisson_map_cache_expired", getName());
} }
String getRemovedChannelName(String name) { String getRemovedChannelNameByKey(Object key) {
return prefixName("redisson_map_cache_removed", name); return prefixName("redisson_map_cache_removed", getName(key));
} }
String getRemovedChannelName() { String getRemovedChannelName() {
return prefixName("redisson_map_cache_removed", getName()); return prefixName("redisson_map_cache_removed", getName());
} }
String getRemovedChannelName(String name) {
return prefixName("redisson_map_cache_removed", name);
}
@Override @Override
@ -777,7 +793,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "return val; " + "return val; "
+ "end; " + "end; "
+ "return v", + "return v",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelNameByKey(key)),
key); key);
} }
@ -789,7 +805,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_REMOVE, return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_REMOVE,
"redis.call('zrem', KEYS[3], unpack(ARGV)); " + "redis.call('zrem', KEYS[3], unpack(ARGV)); " +
"redis.call('zrem', KEYS[2], unpack(ARGV)); " + "redis.call('zrem', KEYS[2], unpack(ARGV)); " +
"for i, key in ipairs(ARGV) do " "for i, key in ipairs(ARGV) do "
+ "local v = redis.call('hget', KEYS[1], key); " + "local v = redis.call('hget', KEYS[1], key); "
+ "if v ~= false then " + "if v ~= false then "
@ -930,7 +946,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('publish', KEYS[5], msg); " + "redis.call('publish', KEYS[5], msg); "
+ "return 0;" + "return 0;"
+ "end;", + "end;",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
} }
@ -977,7 +993,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('publish', KEYS[4], msg); " + "redis.call('publish', KEYS[4], msg); "
+ "return 1; ", + "return 1; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key)),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
} }
@ -1077,7 +1093,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "else " + "else "
+ "return 0; " + "return 0; "
+ "end; ", + "end; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value)); System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
} }
@ -1123,7 +1139,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "return 1; " + "return 1; "
+ "end; " + "end; "
+ "return 0; ", + "return 0; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getUpdatedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue)); System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue));
} }
@ -1149,12 +1165,12 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "else " + "else "
+ "return nil; " + "return nil; "
+ "end", + "end",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getUpdatedChannelName()), Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), key, value); System.currentTimeMillis(), key, value);
} }
@Override @Override
public RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) { public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
if (map.isEmpty()) { if (map.isEmpty()) {
return newSucceededFuture(null); return newSucceededFuture(null);
} }
@ -1198,9 +1214,6 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return topic.addListener(new MessageListener<List<Object>>() { return topic.addListener(new MessageListener<List<Object>>() {
@Override @Override
public void onMessage(String channel, List<Object> msg) { public void onMessage(String channel, List<Object> msg) {
System.out.println("channel: " + channel);
System.out.println("msg: " + msg);
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.REMOVED, (K)msg.get(0), (V)msg.get(1), null); EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.REMOVED, (K)msg.get(0), (V)msg.get(1), null);
((EntryRemovedListener<K, V>) listener).onRemoved(event); ((EntryRemovedListener<K, V>) listener).onRemoved(event);
} }

Loading…
Cancel
Save