diff --git a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java index 4cd41f22f..9ae9ce054 100644 --- a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java +++ b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java @@ -19,6 +19,7 @@ import java.math.BigDecimal; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.redisson.api.RBitSetAsync; import org.redisson.api.RBloomFilter; @@ -54,15 +55,18 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF private volatile int hashIterations; private final CommandExecutor commandExecutor; + private final String configName; protected RedissonBloomFilter(CommandExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; + this.configName = suffixName(getName(), "config"); } protected RedissonBloomFilter(Codec codec, CommandExecutor commandExecutor, String name) { super(codec, commandExecutor, name); this.commandExecutor = commandExecutor; + this.configName = suffixName(getName(), "config"); } private int optimalNumOfHashFunctions(long n, long m) { @@ -179,18 +183,18 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF } private void addConfigCheck(int hashIterations, long size, CommandBatchService executorService) { - executorService.evalReadAsync(getConfigName(), codec, RedisCommands.EVAL_VOID, + executorService.evalReadAsync(configName, codec, RedisCommands.EVAL_VOID, "local size = redis.call('hget', KEYS[1], 'size');" + "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + "assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')", - Arrays.asList(getConfigName()), size, hashIterations); + Arrays.asList(configName), size, hashIterations); } @Override public long count() { CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); - RFuture> configFuture = executorService.readAsync(getConfigName(), StringCodec.INSTANCE, - new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), getConfigName()); + RFuture> configFuture = executorService.readAsync(configName, StringCodec.INSTANCE, + new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), configName); RBitSetAsync bs = createBitSet(executorService); RFuture cardinalityFuture = bs.cardinalityAsync(); executorService.execute(); @@ -202,12 +206,12 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF @Override public RFuture deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getConfigName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), configName); } private void readConfig() { - RFuture> future = commandExecutor.readAsync(getConfigName(), StringCodec.INSTANCE, - new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), getConfigName()); + RFuture> future = commandExecutor.readAsync(configName, StringCodec.INSTANCE, + new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), configName); Map config = commandExecutor.get(future); readConfig(config); @@ -245,13 +249,13 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF hashIterations = optimalNumOfHashFunctions(expectedInsertions, size); CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); - executorService.evalReadAsync(getConfigName(), codec, RedisCommands.EVAL_VOID, + executorService.evalReadAsync(configName, codec, RedisCommands.EVAL_VOID, "local size = redis.call('hget', KEYS[1], 'size');" + "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + "assert(size == false and hashIterations == false, 'Bloom filter config has been changed')", - Arrays.asList(getConfigName()), size, hashIterations); - executorService.writeAsync(getConfigName(), StringCodec.INSTANCE, - new RedisCommand("HMSET", new VoidReplayConvertor()), getConfigName(), + Arrays.asList(configName), size, hashIterations); + executorService.writeAsync(configName, StringCodec.INSTANCE, + new RedisCommand("HMSET", new VoidReplayConvertor()), configName, "size", size, "hashIterations", hashIterations, "expectedInsertions", expectedInsertions, "falseProbability", BigDecimal.valueOf(falseProbability).toPlainString()); try { @@ -267,31 +271,53 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF return true; } - private String getConfigName() { - return suffixName(getName(), "config"); + @Override + public RFuture expireAsync(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "return redis.call('pexpire', KEYS[2], ARGV[1]); ", + Arrays.asList(getName(), configName), + timeUnit.toMillis(timeToLive)); + } + + @Override + public RFuture expireAtAsync(long timestamp) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpireat', KEYS[1], ARGV[1]); " + + "return redis.call('pexpireat', KEYS[2], ARGV[1]); ", + Arrays.asList(getName(), configName), + timestamp); } + @Override + public RFuture clearExpireAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('persist', KEYS[1]); " + + "return redis.call('persist', KEYS[2]); ", + Arrays.asList(getName(), configName)); + } + @Override public long getExpectedInsertions() { - Long result = commandExecutor.read(getConfigName(), LongCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "expectedInsertions"); + Long result = commandExecutor.read(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "expectedInsertions"); return check(result); } @Override public double getFalseProbability() { - Double result = commandExecutor.read(getConfigName(), DoubleCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "falseProbability"); + Double result = commandExecutor.read(configName, DoubleCodec.INSTANCE, RedisCommands.HGET, configName, "falseProbability"); return check(result); } @Override public long getSize() { - Long result = commandExecutor.read(getConfigName(), LongCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "size"); + Long result = commandExecutor.read(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "size"); return check(result); } @Override public int getHashIterations() { - Integer result = commandExecutor.read(getConfigName(), IntegerCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "hashIterations"); + Integer result = commandExecutor.read(configName, IntegerCodec.INSTANCE, RedisCommands.HGET, configName, "hashIterations"); return check(result); }