expire, expireAt and clearExpire commands aren't implemented properly for RBloomFilter object. #1305

pull/1253/merge
Nikita 7 years ago
parent 48ef91c425
commit b177e94d5e

@ -19,6 +19,7 @@ import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBitSetAsync;
import org.redisson.api.RBloomFilter;
@ -54,15 +55,18 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
private volatile int hashIterations;
private final CommandExecutor commandExecutor;
private final String configName;
protected RedissonBloomFilter(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.configName = suffixName(getName(), "config");
}
protected RedissonBloomFilter(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
this.commandExecutor = commandExecutor;
this.configName = suffixName(getName(), "config");
}
private int optimalNumOfHashFunctions(long n, long m) {
@ -179,18 +183,18 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
}
private void addConfigCheck(int hashIterations, long size, CommandBatchService executorService) {
executorService.evalReadAsync(getConfigName(), codec, RedisCommands.EVAL_VOID,
executorService.evalReadAsync(configName, codec, RedisCommands.EVAL_VOID,
"local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')",
Arrays.<Object>asList(getConfigName()), size, hashIterations);
Arrays.<Object>asList(configName), size, hashIterations);
}
@Override
public long count() {
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
RFuture<Map<String, String>> configFuture = executorService.readAsync(getConfigName(), StringCodec.INSTANCE,
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), getConfigName());
RFuture<Map<String, String>> configFuture = executorService.readAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), configName);
RBitSetAsync bs = createBitSet(executorService);
RFuture<Long> cardinalityFuture = bs.cardinalityAsync();
executorService.execute();
@ -202,12 +206,12 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getConfigName());
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), configName);
}
private void readConfig() {
RFuture<Map<String, String>> future = commandExecutor.readAsync(getConfigName(), StringCodec.INSTANCE,
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), getConfigName());
RFuture<Map<String, String>> future = commandExecutor.readAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), configName);
Map<String, String> config = commandExecutor.get(future);
readConfig(config);
@ -245,13 +249,13 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
hashIterations = optimalNumOfHashFunctions(expectedInsertions, size);
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
executorService.evalReadAsync(getConfigName(), codec, RedisCommands.EVAL_VOID,
executorService.evalReadAsync(configName, codec, RedisCommands.EVAL_VOID,
"local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == false and hashIterations == false, 'Bloom filter config has been changed')",
Arrays.<Object>asList(getConfigName()), size, hashIterations);
executorService.writeAsync(getConfigName(), StringCodec.INSTANCE,
new RedisCommand<Void>("HMSET", new VoidReplayConvertor()), getConfigName(),
Arrays.<Object>asList(configName), size, hashIterations);
executorService.writeAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Void>("HMSET", new VoidReplayConvertor()), configName,
"size", size, "hashIterations", hashIterations,
"expectedInsertions", expectedInsertions, "falseProbability", BigDecimal.valueOf(falseProbability).toPlainString());
try {
@ -267,31 +271,53 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
return true;
}
private String getConfigName() {
return suffixName(getName(), "config");
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return redis.call('pexpire', KEYS[2], ARGV[1]); ",
Arrays.<Object>asList(getName(), configName),
timeUnit.toMillis(timeToLive));
}
@Override
public RFuture<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpireat', KEYS[1], ARGV[1]); " +
"return redis.call('pexpireat', KEYS[2], ARGV[1]); ",
Arrays.<Object>asList(getName(), configName),
timestamp);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('persist', KEYS[1]); " +
"return redis.call('persist', KEYS[2]); ",
Arrays.<Object>asList(getName(), configName));
}
@Override
public long getExpectedInsertions() {
Long result = commandExecutor.read(getConfigName(), LongCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "expectedInsertions");
Long result = commandExecutor.read(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "expectedInsertions");
return check(result);
}
@Override
public double getFalseProbability() {
Double result = commandExecutor.read(getConfigName(), DoubleCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "falseProbability");
Double result = commandExecutor.read(configName, DoubleCodec.INSTANCE, RedisCommands.HGET, configName, "falseProbability");
return check(result);
}
@Override
public long getSize() {
Long result = commandExecutor.read(getConfigName(), LongCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "size");
Long result = commandExecutor.read(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "size");
return check(result);
}
@Override
public int getHashIterations() {
Integer result = commandExecutor.read(getConfigName(), IntegerCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "hashIterations");
Integer result = commandExecutor.read(configName, IntegerCodec.INSTANCE, RedisCommands.HGET, configName, "hashIterations");
return check(result);
}

Loading…
Cancel
Save