MapCache listeners added. #879

pull/904/head
Nikita 8 years ago
parent 4a37a5e4a4
commit 0053bf325e

@ -253,17 +253,17 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(id, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this);
return new RedissonLocalCachedMap<K, V>(connectionManager.getCommandExecutor(), name, options, evictionScheduler, this);
}
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(id, codec, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this);
return new RedissonLocalCachedMap<K, V>(codec, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this);
}
@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(id, connectionManager.getCommandExecutor(), name, this);
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
@ -308,17 +308,17 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(id, evictionScheduler, connectionManager.getCommandExecutor(), name, this);
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(id, codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this);
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(id, codec, connectionManager.getCommandExecutor(), name, this);
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override

@ -102,12 +102,12 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMapAsync<K, V> getMap(String name) {
return new RedissonMap<K, V>(id, executorService, name, null);
return new RedissonMap<K, V>(executorService, name, null);
}
@Override
public <K, V> RMapAsync<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(id, codec, executorService, name, null);
return new RedissonMap<K, V>(codec, executorService, name, null);
}
@Override
@ -202,12 +202,12 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(id, codec, evictionScheduler, executorService, name, null);
return new RedissonMapCache<K, V>(codec, evictionScheduler, executorService, name, null);
}
@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(id, evictionScheduler, executorService, name, null);
return new RedissonMapCache<K, V>(evictionScheduler, executorService, name, null);
}
@Override

@ -208,17 +208,17 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private int invalidationStatusListenerId;
private volatile long lastInvalidate;
protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(id, commandExecutor, name, redisson);
init(id, name, options, redisson, evictionScheduler);
protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(commandExecutor, name, redisson);
init(name, options, redisson, evictionScheduler);
}
protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(id, codec, connectionManager, name, redisson);
init(id, name, options, redisson, evictionScheduler);
protected RedissonLocalCachedMap(Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(codec, connectionManager, name, redisson);
init(name, options, redisson, evictionScheduler);
}
private void init(UUID id, String name, LocalCachedMapOptions options, RedissonClient redisson, EvictionScheduler evictionScheduler) {
private void init(String name, LocalCachedMapOptions options, RedissonClient redisson, EvictionScheduler evictionScheduler) {
instanceId = generateId();
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE

@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
@ -47,7 +46,6 @@ import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.mapreduce.RedissonMapReduce;
import org.redisson.misc.Hash;
@ -69,18 +67,15 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
static final RedisCommand<Boolean> EVAL_REMOVE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP);
static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
private final UUID id;
private final RedissonClient redisson;
final RedissonClient redisson;
protected RedissonMap(UUID id, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
protected RedissonMap(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
this.id = id;
this.redisson = redisson;
}
public RedissonMap(UUID id, Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name);
this.id = id;
this.redisson = redisson;
}
@ -92,13 +87,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RLock getLock(K key) {
String lockName = getLockName(key);
return new RedissonLock((CommandExecutor)commandExecutor, lockName, id);
return redisson.getLock(lockName);
}
@Override
public RReadWriteLock getReadWriteLock(K key) {
String lockName = getLockName(key);
return new RedissonReadWriteLock((CommandExecutor)commandExecutor, lockName, id);
return redisson.getReadWriteLock(lockName);
}
private String getLockName(Object key) {

@ -15,6 +15,8 @@
*/
package org.redisson;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
@ -23,19 +25,27 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RMapCache;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.map.event.EntryCreatedListener;
import org.redisson.api.map.event.EntryEvent;
import org.redisson.api.map.event.EntryExpiredListener;
import org.redisson.api.map.event.EntryRemovedListener;
import org.redisson.api.map.event.EntryUpdatedListener;
import org.redisson.api.map.event.MapEntryListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
@ -45,16 +55,13 @@ import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ObjectListDecoder;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.codec.MapCacheEventCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.eviction.EvictionScheduler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.io.IOException;
import java.math.BigDecimal;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.convertor.NumberConvertor;
/**
* <p>Map-based cache with ability to set TTL for each entry via
@ -77,45 +84,39 @@ import org.redisson.client.protocol.convertor.NumberConvertor;
*/
public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCache<K, V> {
static final RedisCommand<Boolean> EVAL_PUT_IF_ABSENT = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP);
static final RedisCommand<Boolean> EVAL_HSET = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP);
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 7, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Void> EVAL_HMSET = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 4, ValueType.MAP);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_REMOVE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 9, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_PUT_TTL_IF_ABSENT = new RedisCommand<Object>("EVAL", 10, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_FAST_PUT_TTL = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 9, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_FAST_PUT_TTL_IF_ABSENT = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 10, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 12, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_PUT_TTL_IF_ABSENT = new RedisCommand<Object>("EVAL", 11, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_GET_TTL = new RedisCommand<Object>("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_CONTAINS_KEY = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP_KEY);
static final RedisCommand<Boolean> EVAL_CONTAINS_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP_VALUE);
static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY);
static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 7, ValueType.MAP_KEY);
static final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Object> EVAL_PUT_IF_ABSENT = new RedisCommand<Object>("EVAL", 5, ValueType.MAP, ValueType.MAP_VALUE);
RedissonMapCache(UUID id, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(id, commandExecutor, name, redisson);
RedissonMapCache(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
}
RedissonMapCache(UUID id, Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(id, codec, commandExecutor, name, redisson);
RedissonMapCache(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
}
public RedissonMapCache(UUID id, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(id, commandExecutor, name, redisson);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName());
public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName());
}
public RedissonMapCache(UUID id, Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(id, codec, commandExecutor, name, redisson);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName());
public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName());
}
@Override
public RFuture<Boolean> containsKeyAsync(Object key) {
checkKey(key);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_CONTAINS_KEY,
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[2]); " +
"local expireDate = 92233720368547758; " +
"if value ~= false then " +
@ -141,14 +142,15 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "return 1;" +
"end;" +
"return 0; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), key);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)),
System.currentTimeMillis(), encodeMapKey(key));
}
@Override
public RFuture<Boolean> containsValueAsync(Object value) {
checkValue(value);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_VALUE,
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local s = redis.call('hgetall', KEYS[1]); "
+ "for i, v in ipairs(s) do "
+ "if i % 2 == 0 then "
@ -179,7 +181,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "end;" +
"return 0;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), value);
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), encodeMapValue(value));
}
@Override
@ -287,23 +289,21 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "if value == false then "
+ "insertable = true; "
+ "else "
+ "if insertable == false then "
+ "local t, val = struct.unpack('dLc0', value); "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); "
+ "if expireIdle ~= false then "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "insertable = true; "
+ "end; "
+ "end; "
+ "local t, val = struct.unpack('dLc0', value); "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); "
+ "if expireIdle ~= false then "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "insertable = true; "
+ "end; "
+ "end; "
+ "if insertable == true then "
@ -322,16 +322,20 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
// value
+ "local val = struct.pack('dLc0', ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "local val = struct.pack('dLc0', tonumber(ARGV[4]), string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('hset', KEYS[1], ARGV[5], val); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return nil;"
+ "else "
+ "local t, val = struct.unpack('dLc0', value); "
+ "redis.call('zadd', KEYS[3], t + ARGV[1], ARGV[5]); "
+ "return val;"
+ "end; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName()),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
}
@Override
@ -339,7 +343,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE_VALUE,
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if value == false then "
+ "return 0; "
@ -348,11 +352,15 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "if val == ARGV[2] then "
+ "redis.call('zrem', KEYS[2], ARGV[1]); "
+ "redis.call('zrem', KEYS[3], ARGV[1]); "
+ "return redis.call('hdel', KEYS[1], ARGV[1]); "
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(val), val); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return 1; "
+ "else "
+ "return 0 "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), key, value);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelName()),
encodeMapKey(key), encodeMapValue(value));
}
@Override
@ -403,11 +411,16 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "redis.call('hset', KEYS[1], ARGV[1], value); "
+ "if v == false then "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2]); "
+ "redis.call('publish', KEYS[2], msg); "
+ "return nil; "
+ "end; "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2], string.len(val), val); "
+ "redis.call('publish', KEYS[3], msg); "
+ "return val; ",
Collections.<Object>singletonList(getName(key)), key, value);
Arrays.<Object>asList(getName(key), getCreatedChannelName(), getUpdatedChannelName()),
key, value);
}
@Override
@ -415,9 +428,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT,
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_IF_ABSENT,
"local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "if redis.call('hsetnx', KEYS[1], ARGV[1], value) == 1 then "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2]); "
+ "redis.call('publish', KEYS[2], msg); "
+ "return nil;"
+ "else "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
@ -427,7 +442,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "local t, val = struct.unpack('dLc0', v); "
+ "return val; "
+ "end",
Collections.<Object>singletonList(getName(key)), key, value);
Arrays.<Object>asList(getName(key), getCreatedChannelName()),
key, value);
}
@Override
@ -473,14 +489,22 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "end; "
+ "end; "
+ "local newValue = tonumber(ARGV[3]); "
+ "if expireDate >= tonumber(ARGV[1]) then "
+ "if value ~= false and expireDate > tonumber(ARGV[1]) then "
+ "newValue = tonumber(val) + newValue; "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(newValue), newValue, string.len(val), val); "
+ "redis.call('publish', KEYS[5], msg); "
+ "else "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "local newValuePack = struct.pack('dLc0', t + tonumber(ARGV[1]), string.len(newValue), newValue); "
+ "redis.call('hset', KEYS[1], ARGV[2], newValuePack); "
+ "return tostring(newValue); ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), keyState, valueState);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()),
System.currentTimeMillis(), keyState, valueState);
}
@Override
@ -533,20 +557,53 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta;
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_FAST_PUT_TTL,
"if tonumber(ARGV[1]) > 0 then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); "
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local insertable = false; "
+ "local value = redis.call('hget', KEYS[1], ARGV[5]); "
+ "local t, val;"
+ "if value == false then "
+ "insertable = true; "
+ "else "
+ "t, val = struct.unpack('dLc0', value); "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); "
+ "if expireIdle ~= false then "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "insertable = true; "
+ "end; "
+ "end; " +
"if tonumber(ARGV[2]) > 0 then "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[5]); "
+ "else "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[5]); "
+ "end; "
+ "if tonumber(ARGV[2]) > 0 then "
+ "redis.call('zadd', KEYS[3], ARGV[2], ARGV[4]); "
+ "if tonumber(ARGV[3]) > 0 then "
+ "redis.call('zadd', KEYS[3], ARGV[3], ARGV[5]); "
+ "else "
+ "redis.call('zrem', KEYS[3], ARGV[4]); "
+ "redis.call('zrem', KEYS[3], ARGV[5]); "
+ "end; "
+ "local value = struct.pack('dLc0', ARGV[3], string.len(ARGV[5]), ARGV[5]); " +
"return redis.call('hset', KEYS[1], ARGV[4], value); ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
+ "local value = struct.pack('dLc0', ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('hset', KEYS[1], ARGV[5], value); "
+ "if insertable == true then "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return 1;"
+ "else "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6], string.len(val), val); "
+ "redis.call('publish', KEYS[5], msg); "
+ "return 0;"
+ "end;",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
}
@Override
@ -595,25 +652,56 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_TTL,
"local v = redis.call('hget', KEYS[1], ARGV[4]); "
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); "
"local insertable = false; "
+ "local v = redis.call('hget', KEYS[1], ARGV[5]); "
+ "if v == false then "
+ "insertable = true; "
+ "else "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); "
+ "if expireIdle ~= false then "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "insertable = true; "
+ "end; "
+ "end; "
+ "if tonumber(ARGV[2]) > 0 then "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[5]); "
+ "else "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[5]); "
+ "end; "
+ "if tonumber(ARGV[2]) > 0 then "
+ "redis.call('zadd', KEYS[3], ARGV[2], ARGV[4]); "
+ "if tonumber(ARGV[3]) > 0 then "
+ "redis.call('zadd', KEYS[3], ARGV[3], ARGV[5]); "
+ "else "
+ "redis.call('zrem', KEYS[3], ARGV[4]); "
+ "redis.call('zrem', KEYS[3], ARGV[5]); "
+ "end; "
+ "local value = struct.pack('dLc0', ARGV[3], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('hset', KEYS[1], ARGV[4], value); "
+ "if v == false then "
+ "local value = struct.pack('dLc0', ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('hset', KEYS[1], ARGV[5], value); "
+ "if insertable == true then "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return nil;"
+ "end; "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6], string.len(val), val); "
+ "redis.call('publish', KEYS[5], msg); "
+ "return val",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
}
String getTimeoutSetNameByKey(Object key) {
@ -640,6 +728,38 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return prefixName("redisson__idle__set", getName());
}
String getCreatedChannelName(String name) {
return prefixName("redisson_map_cache_created", name);
}
String getCreatedChannelName() {
return prefixName("redisson_map_cache_created", getName());
}
String getUpdatedChannelName(String name) {
return prefixName("redisson_map_cache_updated", name);
}
String getUpdatedChannelName() {
return prefixName("redisson_map_cache_updated", getName());
}
String getExpiredChannelName(String name) {
return prefixName("redisson_map_cache_expired", name);
}
String getExpiredChannelName() {
return prefixName("redisson_map_cache_expired", getName());
}
String getRemovedChannelName(String name) {
return prefixName("redisson_map_cache_removed", name);
}
String getRemovedChannelName() {
return prefixName("redisson_map_cache_removed", getName());
}
@Override
public RFuture<V> removeAsync(K key) {
@ -652,10 +772,13 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(val), val); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return val; "
+ "end; "
+ "return v",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), key);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelName()),
key);
}
@Override
@ -666,9 +789,18 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_REMOVE,
"redis.call('zrem', KEYS[3], unpack(ARGV)); " +
"redis.call('zrem', KEYS[2], unpack(ARGV)); " +
"redis.call('zrem', KEYS[2], unpack(ARGV)); " +
"for i, key in ipairs(ARGV) do "
+ "local v = redis.call('hget', KEYS[1], key); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end;" +
"end;" +
"return redis.call('hdel', KEYS[1], unpack(ARGV)); ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), keys);
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName(), getRemovedChannelName()),
keys);
}
@Override
@ -762,10 +894,44 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_HSET,
"local val = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "return redis.call('hset', KEYS[1], ARGV[1], val); ",
Collections.<Object>singletonList(getName(key)), key, value);
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local insertable = false; "
+ "local v = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if v == false then "
+ "insertable = true; "
+ "else "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); "
+ "if expireIdle ~= false then "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "insertable = true; "
+ "end; "
+ "end; " +
"local val = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[2], val); "
+ "if insertable == true then "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return 1;"
+ "else "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3], string.len(val), val); "
+ "redis.call('publish', KEYS[5], msg); "
+ "return 0;"
+ "end;",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@Override
@ -773,11 +939,13 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_IF_ABSENT,
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if value == false then "
+ "local val = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[2], val); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return 1; "
+ "end; "
+ "local t, val = struct.unpack('dLc0', value); "
@ -805,8 +973,12 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('zrem', KEYS[3], ARGV[2]); "
+ "local val = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[2], val); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return 1; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), key, value);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName()),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@Override
@ -854,7 +1026,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta;
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_FAST_PUT_TTL_IF_ABSENT,
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local insertable = false; "
+ "local value = redis.call('hget', KEYS[1], ARGV[5]); "
+ "if value == false then "
@ -898,11 +1070,15 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "local val = struct.pack('dLc0', ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('hset', KEYS[1], ARGV[5], val); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName()),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
}
@Override
@ -915,7 +1091,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
throw new NullPointerException("map new value can't be null");
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE_VALUE,
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if v == false then "
+ "return 0;"
@ -939,12 +1115,16 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) and val == ARGV[3] then "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[4]), ARGV[4], string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "local value = struct.pack('dLc0', t, string.len(ARGV[4]), ARGV[4]); "
+ "redis.call('hset', KEYS[1], ARGV[2], value); "
+ "return 1; "
+ "end; "
+ "return 0; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), key, oldValue, newValue);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getUpdatedChannelName()),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue));
}
@Override
@ -961,11 +1141,16 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "local value = struct.pack('dLc0', t, string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[2], value); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3], string.len(val), val); "
+ "redis.call('publish', KEYS[3], msg); "
+ "return val; "
+ "else "
+ "return nil; "
+ "end",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key)), System.currentTimeMillis(), key, value);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getUpdatedChannelName()),
System.currentTimeMillis(), key, value);
}
@Override
@ -992,10 +1177,85 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "if i % 2 == 0 then "
+ "local val = struct.pack('dLc0', 0, string.len(value), value); "
+ "ARGV[i] = val; "
+ "local key = ARGV[i-1];"
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value); "
+ "redis.call('publish', KEYS[2], msg); "
+ "end;"
+ "end;"
+ "return redis.call('hmset', KEYS[1], unpack(ARGV)); ",
Collections.<Object>singletonList(getName()), params.toArray());
Arrays.<Object>asList(getName(), getCreatedChannelName()), params.toArray());
}
@Override
public int addListener(final MapEntryListener listener) {
if (listener == null) {
throw new NullPointerException();
}
if (listener instanceof EntryRemovedListener) {
RTopic<List<Object>> topic = redisson.getTopic(getRemovedChannelName(), new MapCacheEventCodec(codec));
return topic.addListener(new MessageListener<List<Object>>() {
@Override
public void onMessage(String channel, List<Object> msg) {
System.out.println("channel: " + channel);
System.out.println("msg: " + msg);
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.REMOVED, (K)msg.get(0), (V)msg.get(1), null);
((EntryRemovedListener<K, V>) listener).onRemoved(event);
}
});
}
if (listener instanceof EntryCreatedListener) {
RTopic<List<Object>> topic = redisson.getTopic(getCreatedChannelName(), new MapCacheEventCodec(codec));
return topic.addListener(new MessageListener<List<Object>>() {
@Override
public void onMessage(String channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.CREATED, (K)msg.get(0), (V)msg.get(1), null);
((EntryCreatedListener<K, V>) listener).onCreated(event);
}
});
}
if (listener instanceof EntryUpdatedListener) {
RTopic<List<Object>> topic = redisson.getTopic(getUpdatedChannelName(), new MapCacheEventCodec(codec));
return topic.addListener(new MessageListener<List<Object>>() {
@Override
public void onMessage(String channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.UPDATED, (K)msg.get(0), (V)msg.get(1), (V)msg.get(2));
((EntryUpdatedListener<K, V>) listener).onUpdated(event);
}
});
}
if (listener instanceof EntryExpiredListener) {
RTopic<List<Object>> topic = redisson.getTopic(getExpiredChannelName(), new MapCacheEventCodec(codec));
return topic.addListener(new MessageListener<List<Object>>() {
@Override
public void onMessage(String channel, List<Object> msg) {
EntryEvent<K, V> event = new EntryEvent<K, V>(RedissonMapCache.this, EntryEvent.Type.EXPIRED, (K)msg.get(0), (V)msg.get(1), null);
((EntryExpiredListener<K, V>) listener).onExpired(event);
}
});
}
throw new IllegalArgumentException("Wrong listener type " + listener.getClass());
}
@Override
public void removeListener(int listenerId) {
RTopic<List<Object>> removedTopic = redisson.getTopic(getRemovedChannelName(), new MapCacheEventCodec(codec));
removedTopic.removeListener(listenerId);
RTopic<List<Object>> createdTopic = redisson.getTopic(getCreatedChannelName(), new MapCacheEventCodec(codec));
createdTopic.removeListener(listenerId);
RTopic<List<Object>> updatedTopic = redisson.getTopic(getUpdatedChannelName(), new MapCacheEventCodec(codec));
updatedTopic.removeListener(listenerId);
RTopic<List<Object>> expiredTopic = redisson.getTopic(getExpiredChannelName(), new MapCacheEventCodec(codec));
expiredTopic.removeListener(listenerId);
}
@Override

@ -17,6 +17,8 @@ package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.redisson.api.map.event.MapEntryListener;
/**
* <p>Map-based cache with ability to set TTL for each entry via
* {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)}
@ -130,6 +132,7 @@ public interface RMapCache<K, V> extends RMap<K, V>, RMapCacheAsync<K, V> {
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param ttlUnit - time unit
*
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
@ -177,6 +180,7 @@ public interface RMapCache<K, V> extends RMap<K, V>, RMapCacheAsync<K, V> {
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param ttlUnit - time unit
*
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash
*/
@ -218,4 +222,24 @@ public interface RMapCache<K, V> extends RMap<K, V>, RMapCacheAsync<K, V> {
@Override
int size();
/**
* Adds map entry listener
*
* @see org.redisson.api.map.event.EntryCreatedListener
* @see org.redisson.api.map.event.EntryUpdatedListener
* @see org.redisson.api.map.event.EntryRemovedListener
* @see org.redisson.api.map.event.EntryExpiredListener
*
* @param listener - entry listener
* @return listener id
*/
int addListener(MapEntryListener listener);
/**
* Removes map entry listener
*
* @param listenerId - listener id
*/
void removeListener(int listenerId);
}

@ -134,7 +134,9 @@ public interface RMapCacheAsync<K, V> extends RMapAsync<K, V> {
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit - time unit
* @return <code>true</code> if value has been set successfully
*
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
RFuture<Boolean> fastPutAsync(K key, V value, long ttl, TimeUnit unit);
@ -160,7 +162,8 @@ public interface RMapCacheAsync<K, V> extends RMapAsync<K, V> {
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
* @return <code>true</code> if value has been set successfully
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
RFuture<Boolean> fastPutAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
@ -186,7 +189,8 @@ public interface RMapCacheAsync<K, V> extends RMapAsync<K, V> {
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return previous associated value
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash
*/
RFuture<Boolean> fastPutIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);

@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.map.event;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface EntryCreatedListener<K, V> extends MapEntryListener {
void onCreated(EntryEvent<K, V> event);
}

@ -0,0 +1,66 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.map.event;
import org.redisson.api.RMapCache;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public class EntryEvent<K, V> {
public enum Type {CREATED, UPDATED, REMOVED, EXPIRED}
private RMapCache<K, V> source;
private Type type;
private K key;
private V value;
private V oldValue;
public EntryEvent(RMapCache<K, V> source, Type type, K key, V value, V oldValue) {
super();
this.source = source;
this.type = type;
this.key = key;
this.value = value;
this.oldValue = oldValue;
}
public RMapCache<K, V> getSource() {
return source;
}
public Type getType() {
return type;
}
public K getKey() {
return key;
}
public V getOldValue() {
return oldValue;
}
public V getValue() {
return value;
}
}

@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.map.event;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface EntryExpiredListener<K, V> extends MapEntryListener {
void onExpired(EntryEvent<K, V> event);
}

@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.map.event;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface EntryRemovedListener<K, V> extends MapEntryListener {
void onRemoved(EntryEvent<K, V> event);
}

@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.map.event;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface EntryUpdatedListener<K, V> extends MapEntryListener {
void onUpdated(EntryEvent<K, V> event);
}

@ -0,0 +1,27 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.map.event;
import java.util.EventListener;
/**
*
* @author Nikita Koksharov
*
*/
public interface MapEntryListener extends EventListener {
}

@ -20,6 +20,11 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
/**
*
* @author Nikita Koksharov
*
*/
public class CustomObjectInputStream extends ObjectInputStream {
private ClassLoader classLoader;

@ -0,0 +1,106 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.codec;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
*/
public class MapCacheEventCodec implements Codec {
private final Codec codec;
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
List<Object> result = new ArrayList<Object>(3);
Object key = MapCacheEventCodec.this.decode(buf, state, codec.getMapKeyDecoder());
result.add(key);
Object value = MapCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder());
result.add(value);
if (buf.isReadable()) {
Object oldValue = MapCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder());
result.add(oldValue);
}
return result;
}
};
public MapCacheEventCodec(Codec codec) {
super();
this.codec = codec;
}
@Override
public Decoder<Object> getMapValueDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapKeyEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
throw new UnsupportedOperationException();
}
private Object decode(ByteBuf buf, State state, Decoder<?> decoder) throws IOException {
int keyLen;
if (PlatformDependent.isWindows()) {
keyLen = buf.readIntLE();
} else {
keyLen = (int) buf.readLongLE();
}
ByteBuf keyBuf = buf.readSlice(keyLen);
Object key = decoder.decode(keyBuf, state);
return key;
}
}

@ -63,8 +63,8 @@ public class EvictionScheduler {
}
}
public void schedule(String name, String timeoutSetName, String maxIdleSetName) {
EvictionTask task = new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, executor);
public void schedule(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName) {
EvictionTask task = new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, expiredChannelName, executor);
EvictionTask prevTask = tasks.putIfAbsent(name, task);
if (prevTask == null) {
task.schedule();

@ -34,7 +34,7 @@ abstract class EvictionTask implements Runnable {
final Deque<Integer> sizeHistory = new LinkedList<Integer>();
final int minDelay = 1;
final int maxDelay = 2*60*60;
final int maxDelay = 30*60;
final int keysLimit = 300;
int delay = 10;

@ -32,31 +32,49 @@ public class MapCacheEvictionTask extends EvictionTask {
private final String name;
private final String timeoutSetName;
private final String maxIdleSetName;
private final String expiredChannelName;
public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName, CommandAsyncExecutor executor) {
public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName, CommandAsyncExecutor executor) {
super(executor);
this.name = name;
this.timeoutSetName = timeoutSetName;
this.maxIdleSetName = maxIdleSetName;
this.expiredChannelName = expiredChannelName;
}
@Override
RFuture<Integer> execute() {
return executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredKeys1 = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "for i, key in ipairs(expiredKeys1) do "
+ "local v = redis.call('hget', KEYS[1], key); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end;"
+ "end;"
+ "if #expiredKeys1 > 0 then "
+ "redis.call('zrem', KEYS[3], unpack(expiredKeys1)); "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys1)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys1)); "
+ "end; "
+ "local expiredKeys2 = redis.call('zrangebyscore', KEYS[3], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "for i, key in ipairs(expiredKeys2) do "
+ "local v = redis.call('hget', KEYS[1], key); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end;"
+ "end;"
+ "if #expiredKeys2 > 0 then "
+ "redis.call('zrem', KEYS[3], unpack(expiredKeys2)); "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys2)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys2)); "
+ "end; "
+ "return #expiredKeys1 + #expiredKeys2;",
Arrays.<Object>asList(name, timeoutSetName, maxIdleSetName), System.currentTimeMillis(), keysLimit);
Arrays.<Object>asList(name, timeoutSetName, maxIdleSetName, expiredChannelName), System.currentTimeMillis(), keysLimit);
}
}

@ -70,12 +70,12 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(id, evictionScheduler, commandExecutor, name, null);
this.mapCache = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null);
}
public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(id, codec, evictionScheduler, commandExecutor, name, null);
this.mapCache = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null);
}
@Override

@ -50,12 +50,12 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonMap<K, V>(null, codec, commandExecutor, name, null);
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null);
}
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonMap<K, V>(null, codec, commandExecutor, name, null);
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null);
}
@Override

@ -14,17 +14,26 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.client.codec.LongCodec;
import org.redisson.api.map.event.EntryCreatedListener;
import org.redisson.api.map.event.EntryEvent;
import org.redisson.api.map.event.EntryExpiredListener;
import org.redisson.api.map.event.EntryRemovedListener;
import org.redisson.api.map.event.EntryUpdatedListener;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.MsgPackJacksonCodec;
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
public class RedissonMapCacheTest extends BaseTest {
public static class SimpleKey implements Serializable {
@ -671,11 +680,164 @@ public class RedissonMapCacheTest extends BaseTest {
}
@Test
public void testCreatedListener() {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
checkCreatedListener(map, 1, 2, () -> map.put(1, 2));
checkCreatedListener(map, 10, 2, () -> map.put(10, 2, 2, TimeUnit.SECONDS));
checkCreatedListener(map, 2, 5, () -> map.fastPut(2, 5));
checkCreatedListener(map, 13, 2, () -> map.fastPut(13, 2, 2, TimeUnit.SECONDS));
checkCreatedListener(map, 3, 2, () -> map.putIfAbsent(3, 2));
checkCreatedListener(map, 14, 2, () -> map.putIfAbsent(14, 2, 2, TimeUnit.SECONDS));
checkCreatedListener(map, 4, 1, () -> map.fastPutIfAbsent(4, 1));
checkCreatedListener(map, 15, 2, () -> map.fastPutIfAbsent(15, 2, 2, TimeUnit.SECONDS));
checkCreatedListener(map, 5, 0, () -> map.addAndGet(5, 0));
}
private void checkCreatedListener(RMapCache<Integer, Integer> map, Integer key, Integer value, Runnable runnable) {
AtomicBoolean ref = new AtomicBoolean();
int createListener1 = map.addListener(new EntryCreatedListener<Integer, Integer>() {
@Override
public void onCreated(EntryEvent<Integer, Integer> event) {
assertThat(event.getKey()).isEqualTo(key);
assertThat(event.getValue()).isEqualTo(value);
if (!ref.compareAndSet(false, true)) {
Assert.fail();
}
}
});
runnable.run();
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref);
map.removeListener(createListener1);
}
@Test
public void testUpdatedListener() {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
map.put(1, 1);
checkUpdatedListener(map, 1, 3, 1, () -> map.put(1, 3));
map.put(10, 1);
checkUpdatedListener(map, 10, 2, 1, () -> map.put(10, 2, 2, TimeUnit.SECONDS));
map.put(2, 1);
checkUpdatedListener(map, 2, 5, 1, () -> map.fastPut(2, 5));
map.put(13, 1);
checkUpdatedListener(map, 13, 2, 1, () -> map.fastPut(13, 2, 2, TimeUnit.SECONDS));
map.put(14, 1);
checkUpdatedListener(map, 14, 2, 1, () -> map.replace(14, 2));
checkUpdatedListener(map, 14, 3, 2, () -> map.replace(14, 2, 3));
map.put(5, 1);
checkUpdatedListener(map, 5, 4, 1, () -> map.addAndGet(5, 3));
}
@Test
public void testExpiredListener() {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
checkExpiredListener(map, 10, 2, () -> map.put(10, 2, 2, TimeUnit.SECONDS));
checkExpiredListener(map, 13, 2, () -> map.fastPut(13, 2, 2, TimeUnit.SECONDS));
checkExpiredListener(map, 14, 2, () -> map.putIfAbsent(14, 2, 2, TimeUnit.SECONDS));
checkExpiredListener(map, 15, 2, () -> map.fastPutIfAbsent(15, 2, 2, TimeUnit.SECONDS));
}
private void checkExpiredListener(RMapCache<Integer, Integer> map, Integer key, Integer value, Runnable runnable) {
AtomicBoolean ref = new AtomicBoolean();
int createListener1 = map.addListener(new EntryExpiredListener<Integer, Integer>() {
@Override
public void onExpired(EntryEvent<Integer, Integer> event) {
assertThat(event.getKey()).isEqualTo(key);
assertThat(event.getValue()).isEqualTo(value);
if (!ref.compareAndSet(false, true)) {
Assert.fail();
}
}
});
runnable.run();
Awaitility.await().atMost(Duration.ONE_MINUTE).untilTrue(ref);
map.removeListener(createListener1);
}
private void checkUpdatedListener(RMapCache<Integer, Integer> map, Integer key, Integer value, Integer oldValue, Runnable runnable) {
AtomicBoolean ref = new AtomicBoolean();
int createListener1 = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
@Override
public void onUpdated(EntryEvent<Integer, Integer> event) {
assertThat(event.getKey()).isEqualTo(key);
assertThat(event.getValue()).isEqualTo(value);
assertThat(event.getOldValue()).isEqualTo(oldValue);
if (!ref.compareAndSet(false, true)) {
Assert.fail();
}
}
});
runnable.run();
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref);
map.removeListener(createListener1);
}
@Test
public void testRemovedListener() {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
map.put(1, 1);
checkRemovedListener(map, 1, 1, () -> map.remove(1, 1));
map.put(10, 1);
checkRemovedListener(map, 10, 1, () -> map.remove(10));
map.put(2, 1);
checkRemovedListener(map, 2, 1, () -> map.fastRemove(2));
}
private void checkRemovedListener(RMapCache<Integer, Integer> map, Integer key, Integer value, Runnable runnable) {
AtomicBoolean ref = new AtomicBoolean();
int createListener1 = map.addListener(new EntryRemovedListener<Integer, Integer>() {
@Override
public void onRemoved(EntryEvent<Integer, Integer> event) {
assertThat(event.getKey()).isEqualTo(key);
assertThat(event.getValue()).isEqualTo(value);
if (!ref.compareAndSet(false, true)) {
Assert.fail();
}
}
});
runnable.run();
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref);
map.removeListener(createListener1);
}
@Test
public void testFastPut() throws Exception {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");
Assert.assertTrue(map.fastPut(1, 2));
assertThat(map.get(1)).isEqualTo(2);
Assert.assertFalse(map.fastPut(1, 3));
assertThat(map.get(1)).isEqualTo(3);
Assert.assertEquals(1, map.size());
}

Loading…
Cancel
Save