|
|
@ -36,7 +36,6 @@ import org.redisson.client.protocol.RedisCommands;
|
|
|
|
import org.redisson.client.protocol.RedisStrictCommand;
|
|
|
|
import org.redisson.client.protocol.RedisStrictCommand;
|
|
|
|
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
|
|
|
|
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
|
|
|
|
import org.redisson.client.protocol.convertor.Convertor;
|
|
|
|
import org.redisson.client.protocol.convertor.Convertor;
|
|
|
|
import org.redisson.client.protocol.convertor.LongReplayConvertor;
|
|
|
|
|
|
|
|
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
|
|
|
|
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
|
|
|
|
import org.redisson.client.protocol.decoder.ListScanResult;
|
|
|
|
import org.redisson.client.protocol.decoder.ListScanResult;
|
|
|
|
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
|
|
|
|
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
|
|
|
@ -71,7 +70,6 @@ import reactor.rx.action.support.DefaultSubscriber;
|
|
|
|
public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive implements RSetCacheReactive<V> {
|
|
|
|
public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive implements RSetCacheReactive<V> {
|
|
|
|
|
|
|
|
|
|
|
|
private static final RedisCommand<Void> ADD_ALL = new RedisCommand<Void>("HMSET", new VoidReplayConvertor());
|
|
|
|
private static final RedisCommand<Void> ADD_ALL = new RedisCommand<Void>("HMSET", new VoidReplayConvertor());
|
|
|
|
private static final RedisCommand<Long> EVAL_ADD = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5);
|
|
|
|
|
|
|
|
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
|
|
|
|
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
|
|
|
|
private static final RedisStrictCommand<Boolean> HDEL = new RedisStrictCommand<Boolean>("HDEL", new BooleanReplayConvertor());
|
|
|
|
private static final RedisStrictCommand<Boolean> HDEL = new RedisStrictCommand<Boolean>("HDEL", new BooleanReplayConvertor());
|
|
|
|
|
|
|
|
|
|
|
@ -127,16 +125,16 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
|
|
|
|
byte[] key = hash(o);
|
|
|
|
byte[] key = hash(o);
|
|
|
|
|
|
|
|
|
|
|
|
Publisher<List<Object>> future = commandExecutor.evalReadReactive(getName(), codec, EVAL_CONTAINS_KEY,
|
|
|
|
Publisher<List<Object>> future = commandExecutor.evalReadReactive(getName(), codec, EVAL_CONTAINS_KEY,
|
|
|
|
"local value = redis.call('hexists', KEYS[1], KEYS[3]); " +
|
|
|
|
"local value = redis.call('hexists', KEYS[1], ARGV[1]); " +
|
|
|
|
"local expireDate = 92233720368547758; " +
|
|
|
|
"local expireDate = 92233720368547758; " +
|
|
|
|
"if value == 1 then " +
|
|
|
|
"if value == 1 then " +
|
|
|
|
"local expireDateScore = redis.call('zscore', KEYS[2], KEYS[3]); "
|
|
|
|
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[1]); "
|
|
|
|
+ "if expireDateScore ~= false then "
|
|
|
|
+ "if expireDateScore ~= false then "
|
|
|
|
+ "expireDate = tonumber(expireDateScore) "
|
|
|
|
+ "expireDate = tonumber(expireDateScore) "
|
|
|
|
+ "end; " +
|
|
|
|
+ "end; " +
|
|
|
|
"end;" +
|
|
|
|
"end;" +
|
|
|
|
"return {expireDate, value}; ",
|
|
|
|
"return {expireDate, value}; ",
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName(), key));
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), key);
|
|
|
|
|
|
|
|
|
|
|
|
addExpireListener(result, future, new BooleanReplayConvertor(), false);
|
|
|
|
addExpireListener(result, future, new BooleanReplayConvertor(), false);
|
|
|
|
|
|
|
|
|
|
|
@ -206,6 +204,25 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public Publisher<Boolean> add(V value, long ttl, TimeUnit unit) {
|
|
|
|
public Publisher<Boolean> add(V value, long ttl, TimeUnit unit) {
|
|
|
|
|
|
|
|
if (ttl < 0) {
|
|
|
|
|
|
|
|
throw new IllegalArgumentException("TTL can't be negative");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (ttl == 0) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
byte[] objectState = encode(value);
|
|
|
|
|
|
|
|
byte[] key = hash(objectState);
|
|
|
|
|
|
|
|
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
|
|
|
|
"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +
|
|
|
|
|
|
|
|
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
|
|
|
|
|
|
|
|
"return 1; " +
|
|
|
|
|
|
|
|
"end; " +
|
|
|
|
|
|
|
|
"return 0; ",
|
|
|
|
|
|
|
|
Arrays.<Object>asList(getName()), key, objectState);
|
|
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (unit == null) {
|
|
|
|
if (unit == null) {
|
|
|
|
throw new NullPointerException("TimeUnit param can't be null");
|
|
|
|
throw new NullPointerException("TimeUnit param can't be null");
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -216,13 +233,13 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
|
|
|
|
|
|
|
|
|
|
|
|
long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
|
|
|
|
long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
|
|
|
|
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
"redis.call('zadd', KEYS[2], ARGV[1], KEYS[3]); " +
|
|
|
|
"redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " +
|
|
|
|
"if redis.call('hexists', KEYS[1], KEYS[3]) == 0 then " +
|
|
|
|
"if redis.call('hexists', KEYS[1], ARGV[3]) == 0 then " +
|
|
|
|
"redis.call('hset', KEYS[1], KEYS[3], ARGV[2]); " +
|
|
|
|
"redis.call('hset', KEYS[1], ARGV[3], ARGV[2]); " +
|
|
|
|
"return 1; " +
|
|
|
|
"return 1; " +
|
|
|
|
"end;" +
|
|
|
|
"end;" +
|
|
|
|
"return 0; ",
|
|
|
|
"return 0; ",
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName(), key), timeoutDate, objectState);
|
|
|
|
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeoutDate, objectState, key);
|
|
|
|
} catch (IOException e) {
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -233,15 +250,20 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public Publisher<Long> add(V e) {
|
|
|
|
public Publisher<Long> add(V value) {
|
|
|
|
byte[] key = hash(e);
|
|
|
|
try {
|
|
|
|
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_ADD,
|
|
|
|
byte[] objectState = encode(value);
|
|
|
|
"if redis.call('hexists', KEYS[1], KEYS[2]) == 0 then " +
|
|
|
|
byte[] key = hash(objectState);
|
|
|
|
"redis.call('hset', KEYS[1], KEYS[2], ARGV[1]); " +
|
|
|
|
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LONG,
|
|
|
|
"return 1; " +
|
|
|
|
"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +
|
|
|
|
"end; " +
|
|
|
|
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
|
|
|
|
"return 0; ",
|
|
|
|
"return 1; " +
|
|
|
|
Arrays.<Object>asList(getName(), key), e);
|
|
|
|
"end; " +
|
|
|
|
|
|
|
|
"return 0; ",
|
|
|
|
|
|
|
|
Arrays.<Object>asList(getName()), key, objectState);
|
|
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|