|
|
|
@ -18,7 +18,6 @@ package org.redisson;
|
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.Iterator;
|
|
|
|
|
import java.util.List;
|
|
|
|
@ -27,20 +26,25 @@ import java.util.Set;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
import org.redisson.client.codec.Codec;
|
|
|
|
|
import org.redisson.client.codec.LongCodec;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommand;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommand.ValueType;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
|
|
|
|
|
import org.redisson.client.protocol.convertor.Convertor;
|
|
|
|
|
import org.redisson.client.protocol.convertor.LongReplayConvertor;
|
|
|
|
|
import org.redisson.client.protocol.decoder.MapScanResult;
|
|
|
|
|
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
|
|
|
|
|
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
|
|
|
|
|
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
|
|
|
|
|
import org.redisson.client.protocol.decoder.TTLMapValueReplayDecoder;
|
|
|
|
|
import org.redisson.command.CommandAsyncExecutor;
|
|
|
|
|
import org.redisson.connection.decoder.MapGetAllDecoder;
|
|
|
|
|
import org.redisson.core.RCache;
|
|
|
|
|
|
|
|
|
|
import io.netty.util.concurrent.Future;
|
|
|
|
|
import io.netty.util.concurrent.FutureListener;
|
|
|
|
|
import io.netty.util.concurrent.Promise;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap}
|
|
|
|
@ -59,50 +63,60 @@ public class RedissonCache<K, V> extends RedissonMap<K, V> implements RCache<K,
|
|
|
|
|
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.MAP);
|
|
|
|
|
private static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
|
|
|
|
|
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
|
|
|
|
|
private static final RedisCommand<Object> EVAL_GET_TTL = new RedisCommand<Object>("EVAL", 6, ValueType.MAP_KEY, ValueType.MAP_VALUE);
|
|
|
|
|
private static final RedisCommand<List<Object>> EVAL_GET_TTL = new RedisCommand<List<Object>>("EVAL", new TTLMapValueReplayDecoder<Object>(), 5, ValueType.MAP_KEY, ValueType.MAP_VALUE);
|
|
|
|
|
private static final RedisCommand<List<Object>> EVAL_CONTAINS = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5);
|
|
|
|
|
|
|
|
|
|
private static final RedisCommand<Boolean> EVAL_CONTAINS = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6);
|
|
|
|
|
private static final RedisCommand<Set<Object>> EVAL_HKEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
|
|
|
|
|
private static final RedisCommand<List<Object>> EVAL_HVALS = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), ValueType.MAP_VALUE);
|
|
|
|
|
private static final RedisCommand<Map<Object, Object>> EVAL_HGETALL = new RedisCommand<Map<Object, Object>>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP);
|
|
|
|
|
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 2);
|
|
|
|
|
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 2, ValueType.MAP_KEY);
|
|
|
|
|
|
|
|
|
|
private static final RedisCommand<Long> EVAL_REMOVE_EXPIRED = new RedisCommand<Long>("EVAL", 5);
|
|
|
|
|
|
|
|
|
|
protected RedissonCache(CommandAsyncExecutor commandExecutor, String name) {
|
|
|
|
|
super(commandExecutor, name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RedissonCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
|
|
|
|
|
public RedissonCache(Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
|
|
|
|
|
super(codec, commandExecutor, name);
|
|
|
|
|
|
|
|
|
|
// commandExecutor.getConnectionManager().getGroup().scheduleWithFixedDelay(new Runnable() {
|
|
|
|
|
// @Override
|
|
|
|
|
// public void run() {
|
|
|
|
|
// commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
|
|
|
|
|
// "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 1000); "
|
|
|
|
|
// + "if table.getn(expiredKeys) > 0 then "
|
|
|
|
|
// + "expiredKeys = unpack(expiredKeys); "
|
|
|
|
|
// + "redis.call('zrem', KEYS[2], expiredKeys); "
|
|
|
|
|
// + "redis.call('hdel', KEYS[1], expiredKeys); "
|
|
|
|
|
// + "end; ",
|
|
|
|
|
// Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
|
|
|
|
|
// }
|
|
|
|
|
// }, initialDelay, delay, unit)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Future<Integer> sizeAsync() {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER,
|
|
|
|
|
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
|
|
|
|
|
+ "if table.getn(expiredKeys) > 0 then "
|
|
|
|
|
+ "expiredKeys = unpack(expiredKeys); "
|
|
|
|
|
+ "redis.call('zrem', KEYS[2], expiredKeys); "
|
|
|
|
|
+ "redis.call('hdel', KEYS[1], expiredKeys); "
|
|
|
|
|
+ "end; "
|
|
|
|
|
+ "return redis.call('hlen', KEYS[1]);",
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
|
|
|
|
|
return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Future<Boolean> containsKeyAsync(Object key) {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS,
|
|
|
|
|
"local v = redis.call('hexists', KEYS[1], ARGV[2]); "
|
|
|
|
|
+ "if v == 1 then "
|
|
|
|
|
+ "local expireDate = redis.call('zscore', KEYS[2], ARGV[2]); "
|
|
|
|
|
+ "if expireDate ~= false and expireDate <= ARGV[1] then "
|
|
|
|
|
+ "redis.call('zrem', KEYS[2], ARGV[2]); "
|
|
|
|
|
+ "redis.call('hdel', KEYS[1], ARGV[2]); "
|
|
|
|
|
+ "return false;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "return true;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "return false;",
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), key);
|
|
|
|
|
Promise<Boolean> result = newPromise();
|
|
|
|
|
|
|
|
|
|
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS,
|
|
|
|
|
"local value = redis.call('hexists', KEYS[1], ARGV[1]); " +
|
|
|
|
|
"local expireDate = 92233720368547758; " +
|
|
|
|
|
"if value == 1 then " +
|
|
|
|
|
"expireDate = redis.call('zscore', KEYS[2], ARGV[1]); "
|
|
|
|
|
+ "if expireDate ~= false then "
|
|
|
|
|
+ "expireDate = tonumber(expireDate) "
|
|
|
|
|
+ "end; " +
|
|
|
|
|
"end;" +
|
|
|
|
|
"return {expireDate, value}; ",
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), key);
|
|
|
|
|
|
|
|
|
|
addExpireListener(result, future, new BooleanReplayConvertor(), false);
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -153,50 +167,8 @@ public class RedissonCache<K, V> extends RedissonMap<K, V> implements RCache<K,
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), args.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Future<Set<K>> keySetAsync() {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HKEYS,
|
|
|
|
|
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
|
|
|
|
|
+ "if table.getn(expiredKeys) > 0 then "
|
|
|
|
|
+ "expiredKeys = unpack(expiredKeys); "
|
|
|
|
|
+ "redis.call('zrem', KEYS[2], expiredKeys); "
|
|
|
|
|
+ "redis.call('hdel', KEYS[1], expiredKeys); "
|
|
|
|
|
+ "end; "
|
|
|
|
|
+ "return redis.call('hkeys', KEYS[1]);",
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Future<Collection<V>> valuesAsync() {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HVALS,
|
|
|
|
|
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
|
|
|
|
|
+ "if table.getn(expiredKeys) > 0 then "
|
|
|
|
|
+ "expiredKeys = unpack(expiredKeys); "
|
|
|
|
|
+ "redis.call('zrem', KEYS[2], expiredKeys); "
|
|
|
|
|
+ "redis.call('hdel', KEYS[1], expiredKeys); "
|
|
|
|
|
+ "end; "
|
|
|
|
|
+ "return redis.call('hvals', KEYS[1]);",
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Set<java.util.Map.Entry<K, V>> entrySet() {
|
|
|
|
|
Future<Map<K, V>> f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_HGETALL,
|
|
|
|
|
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
|
|
|
|
|
+ "if table.getn(expiredKeys) > 0 then "
|
|
|
|
|
+ "expiredKeys = unpack(expiredKeys); "
|
|
|
|
|
+ "redis.call('zrem', KEYS[2], expiredKeys); "
|
|
|
|
|
+ "redis.call('hdel', KEYS[1], expiredKeys); "
|
|
|
|
|
+ "end; "
|
|
|
|
|
+ "return redis.call('hgetall', KEYS[1]);",
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
|
|
|
|
|
|
|
|
|
|
Map<K, V> map = get(f);
|
|
|
|
|
return map.entrySet();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public V putIfAbsent(K key, V value, long ttl, TimeUnit unit) {
|
|
|
|
|
return get(putIfAbsent(key, value, ttl, unit));
|
|
|
|
|
return get(putIfAbsentAsync(key, value, ttl, unit));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Future<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit unit) {
|
|
|
|
@ -229,16 +201,58 @@ public class RedissonCache<K, V> extends RedissonMap<K, V> implements RCache<K,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Future<V> getAsync(K key) {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL,
|
|
|
|
|
"local v = redis.call('hget', KEYS[1], ARGV[2]); "
|
|
|
|
|
+ "local expireDate = redis.call('zscore', KEYS[2], ARGV[2]); "
|
|
|
|
|
+ "if expireDate ~= false and expireDate <= ARGV[1] then "
|
|
|
|
|
+ "redis.call('zrem', KEYS[2], ARGV[2]); "
|
|
|
|
|
+ "redis.call('hdel', KEYS[1], ARGV[2]); "
|
|
|
|
|
+ "return nil;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "return v;",
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), key);
|
|
|
|
|
Promise<V> result = newPromise();
|
|
|
|
|
|
|
|
|
|
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_GET_TTL,
|
|
|
|
|
"local value = redis.call('hget', KEYS[1], ARGV[1]); " +
|
|
|
|
|
"local expireDate = redis.call('zscore', KEYS[2], ARGV[1]); "
|
|
|
|
|
+ "if expireDate == false then "
|
|
|
|
|
+ "expireDate = 92233720368547758; "
|
|
|
|
|
+ "end; " +
|
|
|
|
|
"return {expireDate, value}; ",
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), key);
|
|
|
|
|
|
|
|
|
|
addExpireListener(result, future, null, null);
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <T> void addExpireListener(final Promise<T> result, Future<List<Object>> future, final Convertor<T> convertor, final T nullValue) {
|
|
|
|
|
future.addListener(new FutureListener<List<Object>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<List<Object>> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.setFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
List<Object> res = future.getNow();
|
|
|
|
|
Long expireDate = (Long) res.get(0);
|
|
|
|
|
long currentDate = System.currentTimeMillis();
|
|
|
|
|
if (expireDate <= currentDate) {
|
|
|
|
|
result.setSuccess(nullValue);
|
|
|
|
|
expireMap(currentDate);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (convertor != null) {
|
|
|
|
|
result.setSuccess((T) convertor.convert(res.get(1)));
|
|
|
|
|
} else {
|
|
|
|
|
result.setSuccess((T) res.get(1));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void expireMap(long currentDate) {
|
|
|
|
|
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
|
|
|
|
|
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
|
|
|
|
|
+ "if table.getn(expiredKeys) > 0 then "
|
|
|
|
|
+ "expiredKeys = unpack(expiredKeys); "
|
|
|
|
|
+ "redis.call('zrem', KEYS[2], expiredKeys); "
|
|
|
|
|
+ "redis.call('hdel', KEYS[1], expiredKeys); "
|
|
|
|
|
+ "end;",
|
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), currentDate);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public V put(K key, V value, long ttl, TimeUnit unit) {
|
|
|
|
|