RBloomFilter added. #190
parent
a95185afdd
commit
dcab532b84
@ -0,0 +1,295 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.redisson.client.RedisException;
|
||||
import org.redisson.client.codec.Codec;
|
||||
import org.redisson.client.codec.DoubleCodec;
|
||||
import org.redisson.client.codec.IntegerCodec;
|
||||
import org.redisson.client.codec.LongCodec;
|
||||
import org.redisson.client.codec.StringCodec;
|
||||
import org.redisson.client.protocol.RedisCommand;
|
||||
import org.redisson.client.protocol.RedisCommands;
|
||||
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
|
||||
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
|
||||
import org.redisson.command.CommandBatchService;
|
||||
import org.redisson.command.CommandExecutor;
|
||||
import org.redisson.core.RBloomFilter;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
import net.openhft.hashing.LongHashFunction;
|
||||
|
||||
/**
|
||||
* Bloom filter based on 64-bit hash derived from 128-bit hash (xxHash 64-bit + FarmHash 64-bit).
|
||||
*
|
||||
* Code parts from Guava BloomFilter
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomFilter<T> {
|
||||
|
||||
private static final long MAX_SIZE = Integer.MAX_VALUE*2L;
|
||||
|
||||
private volatile long size;
|
||||
private volatile int hashIterations;
|
||||
|
||||
private final CommandExecutor commandExecutor;
|
||||
|
||||
protected RedissonBloomFilter(CommandExecutor commandExecutor, String name) {
|
||||
super(commandExecutor, name);
|
||||
this.commandExecutor = commandExecutor;
|
||||
}
|
||||
|
||||
protected RedissonBloomFilter(Codec codec, CommandExecutor commandExecutor, String name) {
|
||||
super(codec, commandExecutor, name);
|
||||
this.commandExecutor = commandExecutor;
|
||||
}
|
||||
|
||||
private int optimalNumOfHashFunctions(long n, long m) {
|
||||
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
|
||||
}
|
||||
|
||||
private long optimalNumOfBits(long n, double p) {
|
||||
if (p == 0) {
|
||||
p = Double.MIN_VALUE;
|
||||
}
|
||||
return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(T object) {
|
||||
byte[] state = encode(object);
|
||||
|
||||
while (true) {
|
||||
if (size == 0) {
|
||||
readConfig();
|
||||
}
|
||||
|
||||
int hashIterations = this.hashIterations;
|
||||
long size = this.size;
|
||||
|
||||
long[] indexes = hash(state, hashIterations, size);
|
||||
|
||||
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
|
||||
addConfigCheck(hashIterations, size, executorService);
|
||||
for (int i = 0; i < indexes.length; i++) {
|
||||
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), indexes[i], 1);
|
||||
}
|
||||
try {
|
||||
List<Boolean> result = (List<Boolean>) executorService.execute();
|
||||
|
||||
for (Boolean val : result.subList(1, result.size()-1)) {
|
||||
if (val) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} catch (RedisException e) {
|
||||
if (!e.getMessage().contains("Bloom filter config has been changed")) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private long[] hash(byte[] state, int iterations, long size) {
|
||||
long hash1 = LongHashFunction.xx_r39().hashBytes(state);
|
||||
long hash2 = LongHashFunction.farmUo().hashBytes(state);
|
||||
|
||||
long[] indexes = new long[iterations];
|
||||
long hash = hash1;
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
indexes[i] = (hash & Long.MAX_VALUE) % size;
|
||||
if (i % 2 == 0) {
|
||||
hash += hash2;
|
||||
} else {
|
||||
hash += hash1;
|
||||
}
|
||||
}
|
||||
return indexes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(T object) {
|
||||
byte[] state = encode(object);
|
||||
|
||||
while (true) {
|
||||
if (size == 0) {
|
||||
readConfig();
|
||||
}
|
||||
|
||||
int hashIterations = this.hashIterations;
|
||||
long size = this.size;
|
||||
|
||||
long[] indexes = hash(state, hashIterations, size);
|
||||
|
||||
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
|
||||
addConfigCheck(hashIterations, size, executorService);
|
||||
for (int i = 0; i < indexes.length; i++) {
|
||||
executorService.readAsync(getName(), codec, RedisCommands.GETBIT, getName(), indexes[i]);
|
||||
}
|
||||
try {
|
||||
List<Boolean> result = (List<Boolean>) executorService.execute();
|
||||
|
||||
for (Boolean val : result.subList(1, result.size()-1)) {
|
||||
if (!val) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (RedisException e) {
|
||||
if (!e.getMessage().contains("Bloom filter config has been changed")) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] encode(T object) {
|
||||
byte[] state = null;
|
||||
try {
|
||||
state = codec.getValueEncoder().encode(object);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
private void addConfigCheck(int hashIterations, long size, CommandBatchService executorService) {
|
||||
executorService.evalReadAsync(getConfigName(), codec, RedisCommands.EVAL_VOID,
|
||||
"local size = redis.call('hget', KEYS[1], 'size');" +
|
||||
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
|
||||
"assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')",
|
||||
Arrays.<Object>asList(getConfigName()), size, hashIterations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int count() {
|
||||
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
|
||||
Future<Map<String, String>> configFuture = executorService.readAsync(getConfigName(), StringCodec.INSTANCE,
|
||||
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), getConfigName());
|
||||
Future<Long> cardinalityFuture = executorService.readAsync(getName(), codec, RedisCommands.BITCOUNT, getName());
|
||||
executorService.execute();
|
||||
|
||||
readConfig(configFuture.getNow());
|
||||
|
||||
return (int) (-size / ((double) hashIterations) * Math.log(1 - cardinalityFuture.getNow() / ((double) size)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> deleteAsync() {
|
||||
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getConfigName());
|
||||
}
|
||||
|
||||
private void readConfig() {
|
||||
Future<Map<String, String>> future = commandExecutor.readAsync(getConfigName(), StringCodec.INSTANCE,
|
||||
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), getConfigName());
|
||||
Map<String, String> config = commandExecutor.get(future);
|
||||
|
||||
readConfig(config);
|
||||
}
|
||||
|
||||
private void readConfig(Map<String, String> config) {
|
||||
if (config.get("hashIterations") == null
|
||||
|| config.get("size") == null) {
|
||||
throw new IllegalStateException("Bloom filter is not initialized!");
|
||||
}
|
||||
size = Long.valueOf(config.get("size"));
|
||||
hashIterations = Integer.valueOf(config.get("hashIterations"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryInit(long expectedInsertions, double falseProbability) {
|
||||
try {
|
||||
readConfig();
|
||||
return false;
|
||||
} catch (IllegalStateException e) {
|
||||
// skip
|
||||
}
|
||||
|
||||
size = optimalNumOfBits(expectedInsertions, falseProbability);
|
||||
if (size > MAX_SIZE) {
|
||||
throw new IllegalArgumentException("Bloom filter can't be greater than " + MAX_SIZE + ". But calculated size is " + size);
|
||||
}
|
||||
hashIterations = optimalNumOfHashFunctions(expectedInsertions, size);
|
||||
|
||||
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
|
||||
executorService.evalReadAsync(getConfigName(), codec, RedisCommands.EVAL_VOID,
|
||||
"local size = redis.call('hget', KEYS[1], 'size');" +
|
||||
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
|
||||
"assert(size == false and hashIterations == false, 'Bloom filter config has been changed')",
|
||||
Arrays.<Object>asList(getConfigName()), size, hashIterations);
|
||||
executorService.writeAsync(getConfigName(), StringCodec.INSTANCE,
|
||||
new RedisCommand<Void>("HMSET", new VoidReplayConvertor()), getConfigName(),
|
||||
"size", size, "hashIterations", hashIterations,
|
||||
"expectedInsertions", expectedInsertions, "falseProbability", BigDecimal.valueOf(falseProbability).toPlainString());
|
||||
try {
|
||||
executorService.execute();
|
||||
} catch (RedisException e) {
|
||||
if (!e.getMessage().contains("Bloom filter config has been changed")) {
|
||||
throw e;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private String getConfigName() {
|
||||
return "{" + getName() + "}" + "__config";
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getExpectedInsertions() {
|
||||
Long result = commandExecutor.read(getConfigName(), LongCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "expectedInsertions");
|
||||
return check(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getFalseProbability() {
|
||||
Double result = commandExecutor.read(getConfigName(), DoubleCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "falseProbability");
|
||||
return check(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSize() {
|
||||
Long result = commandExecutor.read(getConfigName(), LongCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "size");
|
||||
return check(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHashIterations() {
|
||||
Integer result = commandExecutor.read(getConfigName(), IntegerCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "hashIterations");
|
||||
return check(result);
|
||||
}
|
||||
|
||||
private <V> V check(V result) {
|
||||
if (result == null) {
|
||||
throw new IllegalStateException("Bloom filter is not initialized!");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.client.codec;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
import org.redisson.client.handler.State;
|
||||
import org.redisson.client.protocol.Decoder;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
public class DoubleCodec extends StringCodec {
|
||||
|
||||
public static final DoubleCodec INSTANCE = new DoubleCodec();
|
||||
|
||||
public final Decoder<Object> decoder = new Decoder<Object>() {
|
||||
@Override
|
||||
public Object decode(ByteBuf buf, State state) {
|
||||
return new BigDecimal(buf.toString(CharsetUtil.UTF_8)).doubleValue();
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public Decoder<Object> getValueDecoder() {
|
||||
return decoder;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.client.codec;
|
||||
|
||||
import org.redisson.client.handler.State;
|
||||
import org.redisson.client.protocol.Decoder;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
public class IntegerCodec extends StringCodec {
|
||||
|
||||
public static final IntegerCodec INSTANCE = new IntegerCodec();
|
||||
|
||||
public final Decoder<Object> decoder = new Decoder<Object>() {
|
||||
@Override
|
||||
public Object decode(ByteBuf buf, State state) {
|
||||
return Integer.valueOf(buf.toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public Decoder<Object> getValueDecoder() {
|
||||
return decoder;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.client.protocol.convertor;
|
||||
|
||||
public class BitSetReplayConvertor extends SingleConvertor<Boolean> {
|
||||
|
||||
@Override
|
||||
public Boolean convert(Object obj) {
|
||||
return Long.valueOf(0).equals(obj);
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.core;
|
||||
|
||||
/**
|
||||
* Bloom filter based on 64-bit hash derived from 128-bit hash (xxHash + FarmHash).
|
||||
*
|
||||
* Code parts from Guava BloomFilter
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
public interface RBloomFilter<T> extends RExpirable {
|
||||
|
||||
boolean add(T object);
|
||||
|
||||
boolean contains(T object);
|
||||
|
||||
/**
|
||||
* Initializes Bloom filter params (size and hashIterations)
|
||||
* calculated from <code>expectedInsertions</code> and <code>falseProbability</code>
|
||||
* Stores config to Redis server.
|
||||
*
|
||||
* @param expectedInsertions
|
||||
* @param falseProbability
|
||||
* @return <code>true</code> if Bloom filter initialized
|
||||
* <code>false</code> if Bloom filter already has been initialized
|
||||
*/
|
||||
boolean tryInit(long expectedInsertions, double falseProbability);
|
||||
|
||||
long getExpectedInsertions();
|
||||
|
||||
double getFalseProbability();
|
||||
|
||||
long getSize();
|
||||
|
||||
int getHashIterations();
|
||||
|
||||
/**
|
||||
* Calculates number of elements already added to Bloom filter.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
int count();
|
||||
|
||||
}
|
@ -0,0 +1,68 @@
|
||||
package org.redisson;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.redisson.core.RBloomFilter;
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
public class RedissonBloomFilterTest extends BaseTest {
|
||||
|
||||
@Test
|
||||
public void testConfig() {
|
||||
RBloomFilter<String> filter = redisson.getBloomFilter("filter");
|
||||
filter.tryInit(100, 0.03);
|
||||
assertThat(filter.getExpectedInsertions()).isEqualTo(100);
|
||||
assertThat(filter.getFalseProbability()).isEqualTo(0.03);
|
||||
assertThat(filter.getHashIterations()).isEqualTo(5);
|
||||
assertThat(filter.getSize()).isEqualTo(729);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInit() {
|
||||
RBloomFilter<String> filter = redisson.getBloomFilter("filter");
|
||||
assertThat(filter.tryInit(55000000L, 0.03)).isTrue();
|
||||
assertThat(filter.tryInit(55000001L, 0.03)).isFalse();
|
||||
|
||||
filter.delete();
|
||||
|
||||
assertThat(filter.tryInit(55000001L, 0.03)).isTrue();
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testNotInitializedOnExpectedInsertions() {
|
||||
RBloomFilter<String> filter = redisson.getBloomFilter("filter");
|
||||
|
||||
filter.getExpectedInsertions();
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testNotInitializedOnContains() {
|
||||
RBloomFilter<String> filter = redisson.getBloomFilter("filter");
|
||||
|
||||
filter.contains("32");
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testNotInitializedOnAdd() {
|
||||
RBloomFilter<String> filter = redisson.getBloomFilter("filter");
|
||||
|
||||
filter.add("123");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
RBloomFilter<String> filter = redisson.getBloomFilter("filter");
|
||||
filter.tryInit(550000000L, 0.03);
|
||||
|
||||
assertThat(filter.contains("123")).isFalse();
|
||||
assertThat(filter.add("123")).isTrue();
|
||||
assertThat(filter.contains("123")).isTrue();
|
||||
assertThat(filter.add("123")).isFalse();
|
||||
assertThat(filter.count()).isEqualTo(1);
|
||||
|
||||
assertThat(filter.contains("hflgs;jl;ao1-32471320o31803-24")).isFalse();
|
||||
assertThat(filter.add("hflgs;jl;ao1-32471320o31803-24")).isTrue();
|
||||
assertThat(filter.contains("hflgs;jl;ao1-32471320o31803-24")).isTrue();
|
||||
assertThat(filter.count()).isEqualTo(2);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue