From 751f9f5623c401f30b99bb392857ac83e6cac142 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 27 Jun 2023 10:19:43 +0300 Subject: [PATCH] Feature - RBloomFilter contains() and add() methods added with element collection support #5130 --- .../org/redisson/RedissonBloomFilter.java | 105 +++++++++++++----- .../java/org/redisson/api/RBloomFilter.java | 22 +++- .../org/redisson/RedissonBloomFilterTest.java | 31 ++++++ 3 files changed, 127 insertions(+), 31 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java index d4831b85f..2b94644e8 100644 --- a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java +++ b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java @@ -44,11 +44,10 @@ import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.Hash; import java.math.BigDecimal; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Bloom filter based on Highway 128-bit hash. @@ -99,39 +98,42 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF @Override public boolean add(T object) { - long[] hashes = hash(object); + return add(Arrays.asList(object)) > 0; + } - while (true) { - if (size == 0) { - readConfig(); - } + @Override + public long add(Collection objects) { + if (size == 0) { + readConfig(); + } - int hashIterations = this.hashIterations; - long size = this.size; + List allIndexes = index(objects); - long[] indexes = hash(hashes[0], hashes[1], hashIterations, size); - - CommandBatchService executorService = new CommandBatchService(commandExecutor); - addConfigCheck(hashIterations, size, executorService); - RBitSetAsync bs = createBitSet(executorService); - for (int i = 0; i < indexes.length; i++) { - bs.setAsync(indexes[i]); + CommandBatchService executorService = new CommandBatchService(commandExecutor); + addConfigCheck(hashIterations, size, executorService); + RBitSetAsync bs = createBitSet(executorService); + for (long index : allIndexes) { + bs.setAsync(index); + } + List result = (List) executorService.execute().getResponses(); + List res = result.subList(1, result.size()); + + int s = allIndexes.size() / objects.size(); + int c = 0; + int k = 0; + for (int i = 0; i < res.size(); i++) { + Boolean val = res.get(i); + if (!val) { + k++; } - try { - List result = (List) executorService.execute().getResponses(); - - for (Boolean val : result.subList(1, result.size())) { - if (!val) { - return true; - } - } - return false; - } catch (RedisException e) { - if (e.getMessage() == null || !e.getMessage().contains("Bloom filter config has been changed")) { - throw e; + if ((i + 1) % s == 0) { + if (k > 0) { + c++; } + k = 0; } } + return c; } private long[] hash(long hash1, long hash2, int iterations, long size) { @@ -148,6 +150,51 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF return indexes; } + @Override + public long contains(Collection objects) { + if (size == 0) { + readConfig(); + } + + List allIndexes = index(objects); + + CommandBatchService executorService = new CommandBatchService(commandExecutor); + addConfigCheck(hashIterations, size, executorService); + RBitSetAsync bs = createBitSet(executorService); + for (long index : allIndexes) { + bs.getAsync(index); + } + List result = (List) executorService.execute().getResponses(); + List res = result.subList(1, result.size()); + + int s = allIndexes.size() / objects.size(); + int missed = 0; + int k = 0; + for (int i = 0; i < res.size(); i++) { + Boolean val = res.get(i); + if (!val) { + k++; + } + if ((i + 1) % s == 0) { + if (k > 0) { + missed++; + } + k = 0; + } + } + return objects.size() - missed; + } + + private List index(Collection objects) { + List allIndexes = new LinkedList<>(); + for (T object : objects) { + long[] hashes = hash(object); + long[] indexes = hash(hashes[0], hashes[1], hashIterations, size); + allIndexes.addAll(Arrays.stream(indexes).boxed().collect(Collectors.toList())); + } + return allIndexes; + } + @Override public boolean contains(T object) { long[] hashes = hash(object); diff --git a/redisson/src/main/java/org/redisson/api/RBloomFilter.java b/redisson/src/main/java/org/redisson/api/RBloomFilter.java index 0737957c7..7bab5a37b 100644 --- a/redisson/src/main/java/org/redisson/api/RBloomFilter.java +++ b/redisson/src/main/java/org/redisson/api/RBloomFilter.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import java.util.Collection; + /** * Distributed implementation of Bloom filter based on Highway 128-bit hash. * @@ -34,14 +36,30 @@ public interface RBloomFilter extends RExpirable { boolean add(T object); /** - * Check for element present + * Adds elements + * + * @param elements elements to add + * @return number of added elements + */ + long add(Collection elements); + + /** + * Checks for element presence * - * @param object - element + * @param object element * @return true if element is present * false if element is not present */ boolean contains(T object); + /** + * Checks for elements presence + * + * @param elements elements to check presence + * @return number of elements present + */ + long contains(Collection elements); + /** * Initializes Bloom filter params (size and hashIterations) * calculated from expectedInsertions and falseProbability diff --git a/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java b/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java index 1ca6719d1..c9f2bed87 100644 --- a/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java @@ -5,11 +5,42 @@ import org.junit.jupiter.api.Test; import org.redisson.api.RBloomFilter; import java.time.Instant; +import java.util.Arrays; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; public class RedissonBloomFilterTest extends BaseTest { + @Test + public void testContainsAll() { + RBloomFilter filter = redisson.getBloomFilter("filter"); + filter.tryInit(100, 0.03); + + List list = Arrays.asList("1", "2", "3"); + assertThat(filter.contains(list)).isEqualTo(0); + assertThat(filter.add(list)).isEqualTo(3); + assertThat(filter.contains(list)).isEqualTo(3); + assertThat(filter.contains(Arrays.asList("1", "5"))).isEqualTo(1); + } + + @Test + public void testAddAll() { + RBloomFilter filter = redisson.getBloomFilter("filter"); + filter.tryInit(100, 0.03); + + List list = Arrays.asList("1", "2", "3"); + assertThat(filter.add(list)).isEqualTo(3); + assertThat(filter.add(list)).isZero(); + assertThat(filter.count()).isEqualTo(3); + assertThat(filter.add(Arrays.asList("1", "5"))).isEqualTo(1); + assertThat(filter.count()).isEqualTo(4); + for (String s : list) { + assertThat(filter.contains(s)).isTrue(); + } + + } + @Test public void testFalseProbability1() { Assertions.assertThrows(IllegalArgumentException.class, () -> {