Feature - RBloomFilter contains() and add() methods added with element collection support #5130

pull/5139/head
Nikita Koksharov 2 years ago
parent a6a7848af5
commit 751f9f5623

@ -44,11 +44,10 @@ import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Bloom filter based on Highway 128-bit hash.
@ -99,39 +98,42 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override
public boolean add(T object) {
long[] hashes = hash(object);
return add(Arrays.asList(object)) > 0;
}
while (true) {
if (size == 0) {
readConfig();
}
@Override
public long add(Collection<T> objects) {
if (size == 0) {
readConfig();
}
int hashIterations = this.hashIterations;
long size = this.size;
List<Long> allIndexes = index(objects);
long[] indexes = hash(hashes[0], hashes[1], hashIterations, size);
CommandBatchService executorService = new CommandBatchService(commandExecutor);
addConfigCheck(hashIterations, size, executorService);
RBitSetAsync bs = createBitSet(executorService);
for (int i = 0; i < indexes.length; i++) {
bs.setAsync(indexes[i]);
CommandBatchService executorService = new CommandBatchService(commandExecutor);
addConfigCheck(hashIterations, size, executorService);
RBitSetAsync bs = createBitSet(executorService);
for (long index : allIndexes) {
bs.setAsync(index);
}
List<Boolean> result = (List<Boolean>) executorService.execute().getResponses();
List<Boolean> res = result.subList(1, result.size());
int s = allIndexes.size() / objects.size();
int c = 0;
int k = 0;
for (int i = 0; i < res.size(); i++) {
Boolean val = res.get(i);
if (!val) {
k++;
}
try {
List<Boolean> result = (List<Boolean>) executorService.execute().getResponses();
for (Boolean val : result.subList(1, result.size())) {
if (!val) {
return true;
}
}
return false;
} catch (RedisException e) {
if (e.getMessage() == null || !e.getMessage().contains("Bloom filter config has been changed")) {
throw e;
if ((i + 1) % s == 0) {
if (k > 0) {
c++;
}
k = 0;
}
}
return c;
}
private long[] hash(long hash1, long hash2, int iterations, long size) {
@ -148,6 +150,51 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
return indexes;
}
@Override
public long contains(Collection<T> objects) {
if (size == 0) {
readConfig();
}
List<Long> allIndexes = index(objects);
CommandBatchService executorService = new CommandBatchService(commandExecutor);
addConfigCheck(hashIterations, size, executorService);
RBitSetAsync bs = createBitSet(executorService);
for (long index : allIndexes) {
bs.getAsync(index);
}
List<Boolean> result = (List<Boolean>) executorService.execute().getResponses();
List<Boolean> res = result.subList(1, result.size());
int s = allIndexes.size() / objects.size();
int missed = 0;
int k = 0;
for (int i = 0; i < res.size(); i++) {
Boolean val = res.get(i);
if (!val) {
k++;
}
if ((i + 1) % s == 0) {
if (k > 0) {
missed++;
}
k = 0;
}
}
return objects.size() - missed;
}
private List<Long> index(Collection<T> objects) {
List<Long> allIndexes = new LinkedList<>();
for (T object : objects) {
long[] hashes = hash(object);
long[] indexes = hash(hashes[0], hashes[1], hashIterations, size);
allIndexes.addAll(Arrays.stream(indexes).boxed().collect(Collectors.toList()));
}
return allIndexes;
}
@Override
public boolean contains(T object) {
long[] hashes = hash(object);

@ -15,6 +15,8 @@
*/
package org.redisson.api;
import java.util.Collection;
/**
* Distributed implementation of Bloom filter based on Highway 128-bit hash.
*
@ -34,14 +36,30 @@ public interface RBloomFilter<T> extends RExpirable {
boolean add(T object);
/**
* Check for element present
* Adds elements
*
* @param elements elements to add
* @return number of added elements
*/
long add(Collection<T> elements);
/**
* Checks for element presence
*
* @param object - element
* @param object element
* @return <code>true</code> if element is present
* <code>false</code> if element is not present
*/
boolean contains(T object);
/**
* Checks for elements presence
*
* @param elements elements to check presence
* @return number of elements present
*/
long contains(Collection<T> elements);
/**
* Initializes Bloom filter params (size and hashIterations)
* calculated from <code>expectedInsertions</code> and <code>falseProbability</code>

@ -5,11 +5,42 @@ import org.junit.jupiter.api.Test;
import org.redisson.api.RBloomFilter;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonBloomFilterTest extends BaseTest {
@Test
public void testContainsAll() {
RBloomFilter<String> filter = redisson.getBloomFilter("filter");
filter.tryInit(100, 0.03);
List<String> list = Arrays.asList("1", "2", "3");
assertThat(filter.contains(list)).isEqualTo(0);
assertThat(filter.add(list)).isEqualTo(3);
assertThat(filter.contains(list)).isEqualTo(3);
assertThat(filter.contains(Arrays.asList("1", "5"))).isEqualTo(1);
}
@Test
public void testAddAll() {
RBloomFilter<String> filter = redisson.getBloomFilter("filter");
filter.tryInit(100, 0.03);
List<String> list = Arrays.asList("1", "2", "3");
assertThat(filter.add(list)).isEqualTo(3);
assertThat(filter.add(list)).isZero();
assertThat(filter.count()).isEqualTo(3);
assertThat(filter.add(Arrays.asList("1", "5"))).isEqualTo(1);
assertThat(filter.count()).isEqualTo(4);
for (String s : list) {
assertThat(filter.contains(s)).isTrue();
}
}
@Test
public void testFalseProbability1() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {

Loading…
Cancel
Save