diff --git a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java index 1e46b6e3f..0fb9fccf8 100644 --- a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java +++ b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java @@ -29,22 +29,21 @@ package org.redisson; import io.netty.buffer.ByteBuf; -import org.redisson.api.RBitSetAsync; import org.redisson.api.RBloomFilter; import org.redisson.api.RFuture; -import org.redisson.client.RedisException; -import org.redisson.client.codec.*; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.command.CommandBatchService; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.Hash; -import java.math.BigDecimal; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -102,21 +101,29 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF } @Override - public long add(Collection objects) { + public RFuture addAsync(T object) { + CompletionStage f = addAsync(Arrays.asList(object)).thenApply(r -> r > 0); + return new CompletableFutureWrapper<>(f); + } + + @Override + public RFuture addAsync(Collection objects) { + CompletionStage future = CompletableFuture.completedFuture(null); if (size == 0) { - readConfig(); + future = readConfigAsync(); } - List allIndexes = index(objects); + CompletionStage f = future.thenCompose(r -> { + List allIndexes = index(objects); - List params = new ArrayList<>(); - params.add(size); - params.add(hashIterations); - int s = allIndexes.size() / objects.size(); - params.add(s); - params.addAll(allIndexes); + List params = new ArrayList<>(); + params.add(size); + params.add(hashIterations); + int s = allIndexes.size() / objects.size(); + params.add(s); + params.addAll(allIndexes); - return get(commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG, + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local size = redis.call('hget', KEYS[1], 'size');" + "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + "assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" + @@ -137,7 +144,15 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF "end; " + "return c;", Arrays.asList(configName, getRawName()), - params.toArray())); + params.toArray()); + }); + + return new CompletableFutureWrapper<>(f); + } + + @Override + public long add(Collection objects) { + return get(addAsync(objects)); } private long[] hash(long hash1, long hash2, int iterations, long size) { @@ -155,42 +170,51 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF } @Override - public long contains(Collection objects) { + public RFuture containsAsync(Collection objects) { + CompletionStage future = CompletableFuture.completedFuture(null); if (size == 0) { - readConfig(); + future = readConfigAsync(); } - List allIndexes = index(objects); - - List params = new ArrayList<>(); - params.add(size); - params.add(hashIterations); - params.add(objects.size()); - params.addAll(allIndexes); - - return get(commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG, - "local size = redis.call('hget', KEYS[1], 'size');" + - "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + - "assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" + - - "local k = 0;" + - "local c = 0;" + - "local cc = (#ARGV - 3) / ARGV[3];" + - "for i = 4, #ARGV, 1 do " + - "local r = redis.call('getbit', KEYS[2], ARGV[i]); " + - "if r == 0 then " + - "k = k + 1;" + - "end; " + - "if ((i - 4) + 1) % cc == 0 then " + - "if k > 0 then " + - "c = c + 1;" + + CompletionStage f = future.thenCompose(r -> { + List allIndexes = index(objects); + + List params = new ArrayList<>(); + params.add(size); + params.add(hashIterations); + params.add(objects.size()); + params.addAll(allIndexes); + + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, + "local size = redis.call('hget', KEYS[1], 'size');" + + "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + + "assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" + + + "local k = 0;" + + "local c = 0;" + + "local cc = (#ARGV - 3) / ARGV[3];" + + "for i = 4, #ARGV, 1 do " + + "local r = redis.call('getbit', KEYS[2], ARGV[i]); " + + "if r == 0 then " + + "k = k + 1;" + + "end; " + + "if ((i - 4) + 1) % cc == 0 then " + + "if k > 0 then " + + "c = c + 1;" + + "end; " + + "k = 0; " + + "end; " + "end; " + - "k = 0; " + - "end; " + - "end; " + - "return ARGV[3] - c;", - Arrays.asList(configName, getRawName()), - params.toArray())); + "return ARGV[3] - c;", + Arrays.asList(configName, getRawName()), + params.toArray()); + }); + return new CompletableFutureWrapper<>(f); + } + + @Override + public long contains(Collection objects) { + return get(containsAsync(objects)); } private List index(Collection objects) { @@ -208,22 +232,27 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF return contains(Arrays.asList(object)) > 0; } - protected RBitSetAsync createBitSet(CommandBatchService executorService) { - return new RedissonBitSet(executorService, getName()); + @Override + public RFuture containsAsync(T object) { + CompletionStage f = containsAsync(Arrays.asList(object)).thenApply(r -> r > 0); + return new CompletableFutureWrapper<>(f); } @Override public long count() { - CommandBatchService executorService = new CommandBatchService(commandExecutor); - RFuture> configFuture = executorService.readAsync(configName, StringCodec.INSTANCE, - new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), configName); - RBitSetAsync bs = createBitSet(executorService); - RFuture cardinalityFuture = bs.cardinalityAsync(); - executorService.execute(); - - readConfig(commandExecutor.getNow(configFuture.toCompletableFuture())); + return get(countAsync()); + } - return Math.round(-size / ((double) hashIterations) * Math.log(1 - commandExecutor.getNow(cardinalityFuture.toCompletableFuture()) / ((double) size))); + @Override + public RFuture countAsync() { + CompletionStage f = readConfigAsync(); + CompletionStage res = f.thenCompose(r -> { + RedissonBitSet bs = new RedissonBitSet(commandExecutor, getName()); + return bs.cardinalityAsync().thenApply(c -> { + return Math.round(-size / ((double) hashIterations) * Math.log(1 - c / ((double) size))); + }); + }); + return new CompletableFutureWrapper<>(res); } @Override @@ -237,12 +266,12 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF return super.sizeInMemoryAsync(keys); } - private void readConfig() { + private CompletionStage readConfigAsync() { RFuture> future = commandExecutor.readAsync(configName, StringCodec.INSTANCE, new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), configName); - Map config = commandExecutor.get(future); - - readConfig(config); + return future.thenAccept(config -> { + readConfig(config); + }); } private void readConfig(Map config) { @@ -260,6 +289,11 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF @Override public boolean tryInit(long expectedInsertions, double falseProbability) { + return get(tryInitAsync(expectedInsertions, falseProbability)); + } + + @Override + public RFuture tryInitAsync(long expectedInsertions, double falseProbability) { if (falseProbability > 1) { throw new IllegalArgumentException("Bloom filter false probability can't be greater than 1"); } @@ -276,29 +310,21 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF } hashIterations = optimalNumOfHashFunctions(expectedInsertions, size); - CommandBatchService executorService = new CommandBatchService(commandExecutor); - executorService.evalReadAsync(configName, codec, RedisCommands.EVAL_VOID, - "local size = redis.call('hget', KEYS[1], 'size');" + - "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + - "assert(size == false and hashIterations == false, 'Bloom filter config has been changed')", - Arrays.asList(configName), size, hashIterations); - executorService.writeAsync(configName, StringCodec.INSTANCE, - new RedisCommand("HMSET", new VoidReplayConvertor()), configName, - "size", size, "hashIterations", hashIterations, - "expectedInsertions", expectedInsertions, "falseProbability", BigDecimal.valueOf(falseProbability).toPlainString()); - try { - executorService.execute(); - } catch (RedisException e) { - if (e.getMessage() == null || !e.getMessage().contains("Bloom filter config has been changed")) { - throw e; - } - readConfig(); - return false; - } + return commandExecutor.evalWriteAsync(configName, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('exists', KEYS[1]) == 1 then " + + "return 0;" + + "end; " + - return true; + "redis.call('hset', KEYS[1], 'size', ARGV[1]);" + + "redis.call('hset', KEYS[1], 'hashIterations', ARGV[2]);" + + "redis.call('hset', KEYS[1], 'expectedInsertions', ARGV[3]);" + + "redis.call('hset', KEYS[1], 'falseProbability', ARGV[4]);" + + "return 1;", + Arrays.asList(configName), + size, hashIterations, expectedInsertions, falseProbability); } + @Override public RFuture expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { return super.expireAsync(timeToLive, timeUnit, param, getRawName(), configName); @@ -316,26 +342,53 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF @Override public long getExpectedInsertions() { - Long result = get(commandExecutor.readAsync(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "expectedInsertions")); - return check(result); + return get(getExpectedInsertionsAsync()); + } + + @Override + public RFuture getExpectedInsertionsAsync() { + return readSettingAsync(RedisCommands.EVAL_LONG, LongCodec.INSTANCE, "expectedInsertions"); + } + + private RFuture readSettingAsync(RedisCommand evalCommandType, Codec codec, String settingName) { + return commandExecutor.evalReadAsync(configName, codec, evalCommandType, + "if redis.call('exists', KEYS[1]) == 0 then " + + "assert(false, 'Bloom filter is not initialized')" + + "end; " + + + "return redis.call('hget', KEYS[1], ARGV[1]);", + Arrays.asList(configName), + settingName); } @Override public double getFalseProbability() { - Double result = get(commandExecutor.readAsync(configName, DoubleCodec.INSTANCE, RedisCommands.HGET, configName, "falseProbability")); - return check(result); + return get(getFalseProbabilityAsync()); + } + + @Override + public RFuture getFalseProbabilityAsync() { + return readSettingAsync(RedisCommands.EVAL_DOUBLE, DoubleCodec.INSTANCE, "falseProbability"); } @Override public long getSize() { - Long result = get(commandExecutor.readAsync(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "size")); - return check(result); + return get(getSizeAsync()); + } + + @Override + public RFuture getSizeAsync() { + return readSettingAsync(RedisCommands.EVAL_LONG, LongCodec.INSTANCE, "size"); } @Override public int getHashIterations() { - Integer result = get(commandExecutor.readAsync(configName, IntegerCodec.INSTANCE, RedisCommands.HGET, configName, "hashIterations")); - return check(result); + return get(getHashIterationsAsync()); + } + + @Override + public RFuture getHashIterationsAsync() { + return readSettingAsync(RedisCommands.EVAL_INTEGER, LongCodec.INSTANCE, "hashIterations"); } @Override @@ -381,11 +434,4 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF return new CompletableFutureWrapper<>(f); } - private V check(V result) { - if (result == null) { - throw new IllegalStateException("Bloom filter is not initialized!"); - } - return result; - } - } diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 9463eac7c..1f2cd5a16 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -870,6 +870,24 @@ public class RedissonReactive implements RedissonReactiveClient { return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(ca, params.getName()), RBitSetReactive.class); } + @Override + public RBloomFilterReactive getBloomFilter(String name) { + return ReactiveProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(commandExecutor, name), RBloomFilterReactive.class); + } + + @Override + public RBloomFilterReactive getBloomFilter(String name, Codec codec) { + return ReactiveProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(codec, commandExecutor, name), RBloomFilterReactive.class); + } + + @Override + public RBloomFilterReactive getBloomFilter(PlainOptions options) { + PlainParams params = (PlainParams) options; + CommandReactiveExecutor ca = commandExecutor.copy(params); + return ReactiveProxyBuilder.create(commandExecutor, + new RedissonBloomFilter(params.getCodec(), ca, params.getName()), RBloomFilterReactive.class); + } + @Override public RFunctionReactive getFunction() { return ReactiveProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor), RFunctionReactive.class); diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 05652b027..d9a90bc24 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -873,6 +873,24 @@ public class RedissonRx implements RedissonRxClient { return RxProxyBuilder.create(commandExecutor, new RedissonBitSet(ce, params.getName()), RBitSetRx.class); } + @Override + public RBloomFilterRx getBloomFilter(String name) { + return RxProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(commandExecutor, name), RBloomFilterRx.class); + } + + @Override + public RBloomFilterRx getBloomFilter(String name, Codec codec) { + return RxProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(codec, commandExecutor, name), RBloomFilterRx.class); + } + + @Override + public RBloomFilterRx getBloomFilter(PlainOptions options) { + PlainParams params = (PlainParams) options; + CommandRxExecutor ce = commandExecutor.copy(params); + return RxProxyBuilder.create(commandExecutor, + new RedissonBloomFilter(params.getCodec(), ce, params.getName()), RBloomFilterRx.class); + } + @Override public RFunctionRx getFunction() { return RxProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor), RFunctionRx.class); diff --git a/redisson/src/main/java/org/redisson/api/RBloomFilter.java b/redisson/src/main/java/org/redisson/api/RBloomFilter.java index 0d92dee71..5e176e008 100644 --- a/redisson/src/main/java/org/redisson/api/RBloomFilter.java +++ b/redisson/src/main/java/org/redisson/api/RBloomFilter.java @@ -24,7 +24,7 @@ import java.util.Collection; * * @param - type of object */ -public interface RBloomFilter extends RExpirable { +public interface RBloomFilter extends RExpirable, RBloomFilterAsync { /** * Adds element diff --git a/redisson/src/main/java/org/redisson/api/RBloomFilterAsync.java b/redisson/src/main/java/org/redisson/api/RBloomFilterAsync.java new file mode 100644 index 000000000..4d9354948 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RBloomFilterAsync.java @@ -0,0 +1,113 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.Collection; + +/** + * Distributed implementation of Bloom filter based on Highway 128-bit hash. + * + * @author Nikita Koksharov + * + * @param - type of object + */ +public interface RBloomFilterAsync extends RExpirableAsync { + + /** + * Adds element + * + * @param object - element to add + * @return true if element has been added successfully + * false if element is already present + */ + RFuture addAsync(T object); + + /** + * Adds elements + * + * @param elements elements to add + * @return number of added elements + */ + RFuture addAsync(Collection elements); + + /** + * Checks for element presence + * + * @param object element + * @return true if element is present + * false if element is not present + */ + RFuture containsAsync(T object); + + /** + * Checks for elements presence + * + * @param elements elements to check presence + * @return number of elements present + */ + RFuture containsAsync(Collection elements); + + /** + * Initializes Bloom filter params (size and hashIterations) + * calculated from expectedInsertions and falseProbability + * Stores config to Redis server. + * + * @param expectedInsertions - expected amount of insertions per element + * @param falseProbability - expected false probability + * @return true if Bloom filter initialized + * false if Bloom filter already has been initialized + */ + RFuture tryInitAsync(long expectedInsertions, double falseProbability); + + /** + * Returns expected amount of insertions per element. + * Calculated during bloom filter initialization. + * + * @return expected amount of insertions per element + */ + RFuture getExpectedInsertionsAsync(); + + /** + * Returns false probability of element presence. + * Calculated during bloom filter initialization. + * + * @return false probability of element presence + */ + RFuture getFalseProbabilityAsync(); + + /** + * Returns number of bits in Redis memory required by this instance + * + * @return number of bits + */ + RFuture getSizeAsync(); + + /** + * Returns hash iterations amount used per element. + * Calculated during bloom filter initialization. + * + * @return hash iterations amount + */ + RFuture getHashIterationsAsync(); + + /** + * Calculates probabilistic number of elements already added to Bloom filter. + * + * @return probabilistic number of elements + */ + RFuture countAsync(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RBloomFilterReactive.java b/redisson/src/main/java/org/redisson/api/RBloomFilterReactive.java new file mode 100644 index 000000000..08b990fb7 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RBloomFilterReactive.java @@ -0,0 +1,115 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import reactor.core.publisher.Mono; + +import java.util.Collection; + +/** + * Distributed implementation of Bloom filter based on Highway 128-bit hash. + * + * @author Nikita Koksharov + * + * @param - type of object + */ +public interface RBloomFilterReactive extends RExpirableReactive { + + /** + * Adds element + * + * @param object - element to add + * @return true if element has been added successfully + * false if element is already present + */ + Mono add(T object); + + /** + * Adds elements + * + * @param elements elements to add + * @return number of added elements + */ + Mono add(Collection elements); + + /** + * Checks for element presence + * + * @param object element + * @return true if element is present + * false if element is not present + */ + Mono contains(T object); + + /** + * Checks for elements presence + * + * @param elements elements to check presence + * @return number of elements present + */ + Mono contains(Collection elements); + + /** + * Initializes Bloom filter params (size and hashIterations) + * calculated from expectedInsertions and falseProbability + * Stores config to Redis server. + * + * @param expectedInsertions - expected amount of insertions per element + * @param falseProbability - expected false probability + * @return true if Bloom filter initialized + * false if Bloom filter already has been initialized + */ + Mono tryInit(long expectedInsertions, double falseProbability); + + /** + * Returns expected amount of insertions per element. + * Calculated during bloom filter initialization. + * + * @return expected amount of insertions per element + */ + Mono getExpectedInsertions(); + + /** + * Returns false probability of element presence. + * Calculated during bloom filter initialization. + * + * @return false probability of element presence + */ + Mono getFalseProbability(); + + /** + * Returns number of bits in Redis memory required by this instance + * + * @return number of bits + */ + Mono getSize(); + + /** + * Returns hash iterations amount used per element. + * Calculated during bloom filter initialization. + * + * @return hash iterations amount + */ + Mono getHashIterations(); + + /** + * Calculates probabilistic number of elements already added to Bloom filter. + * + * @return probabilistic number of elements + */ + Mono count(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RBloomFilterRx.java b/redisson/src/main/java/org/redisson/api/RBloomFilterRx.java new file mode 100644 index 000000000..0fdee738f --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RBloomFilterRx.java @@ -0,0 +1,115 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import io.reactivex.rxjava3.core.Single; + +import java.util.Collection; + +/** + * Distributed implementation of Bloom filter based on Highway 128-bit hash. + * + * @author Nikita Koksharov + * + * @param - type of object + */ +public interface RBloomFilterRx extends RExpirableRx { + + /** + * Adds element + * + * @param object - element to add + * @return true if element has been added successfully + * false if element is already present + */ + Single add(T object); + + /** + * Adds elements + * + * @param elements elements to add + * @return number of added elements + */ + Single add(Collection elements); + + /** + * Checks for element presence + * + * @param object element + * @return true if element is present + * false if element is not present + */ + Single contains(T object); + + /** + * Checks for elements presence + * + * @param elements elements to check presence + * @return number of elements present + */ + Single contains(Collection elements); + + /** + * Initializes Bloom filter params (size and hashIterations) + * calculated from expectedInsertions and falseProbability + * Stores config to Redis server. + * + * @param expectedInsertions - expected amount of insertions per element + * @param falseProbability - expected false probability + * @return true if Bloom filter initialized + * false if Bloom filter already has been initialized + */ + Single tryInit(long expectedInsertions, double falseProbability); + + /** + * Returns expected amount of insertions per element. + * Calculated during bloom filter initialization. + * + * @return expected amount of insertions per element + */ + Single getExpectedInsertions(); + + /** + * Returns false probability of element presence. + * Calculated during bloom filter initialization. + * + * @return false probability of element presence + */ + Single getFalseProbability(); + + /** + * Returns number of bits in Redis memory required by this instance + * + * @return number of bits + */ + Single getSize(); + + /** + * Returns hash iterations amount used per element. + * Calculated during bloom filter initialization. + * + * @return hash iterations amount + */ + Single getHashIterations(); + + /** + * Calculates probabilistic number of elements already added to Bloom filter. + * + * @return probabilistic number of elements + */ + Single count(); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index 4a883e252..92d9857bd 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -1361,6 +1361,35 @@ public interface RedissonReactiveClient { */ RBitSetReactive getBitSet(CommonOptions options); + /** + * Returns bloom filter instance by name. + * + * @param type of value + * @param name name of object + * @return BloomFilter object + */ + RBloomFilterReactive getBloomFilter(String name); + + /** + * Returns bloom filter instance by name + * using provided codec for objects. + * + * @param type of value + * @param name name of object + * @param codec codec for values + * @return BloomFilter object + */ + RBloomFilterReactive getBloomFilter(String name, Codec codec); + + /** + * Returns bloom filter instance with specified options. + * + * @param type of value + * @param options instance options + * @return BloomFilter object + */ + RBloomFilterReactive getBloomFilter(PlainOptions options); + /** * Returns interface for Redis Function feature * diff --git a/redisson/src/main/java/org/redisson/api/RedissonRxClient.java b/redisson/src/main/java/org/redisson/api/RedissonRxClient.java index 94dc66055..fc35c37d9 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonRxClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonRxClient.java @@ -1350,6 +1350,35 @@ public interface RedissonRxClient { */ RBitSetRx getBitSet(CommonOptions options); + /** + * Returns bloom filter instance by name. + * + * @param type of value + * @param name name of object + * @return BloomFilter object + */ + RBloomFilterRx getBloomFilter(String name); + + /** + * Returns bloom filter instance by name + * using provided codec for objects. + * + * @param type of value + * @param name name of object + * @param codec codec for values + * @return BloomFilter object + */ + RBloomFilterRx getBloomFilter(String name, Codec codec); + + /** + * Returns bloom filter instance with specified options. + * + * @param type of value + * @param options instance options + * @return BloomFilter object + */ + RBloomFilterRx getBloomFilter(PlainOptions options); + /** * Returns interface for Redis Function feature * diff --git a/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java b/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java index e888bd838..213b168d9 100644 --- a/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBloomFilterTest.java @@ -3,6 +3,7 @@ package org.redisson; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.RBloomFilter; +import org.redisson.client.RedisException; import java.time.Instant; import java.util.Arrays; @@ -89,7 +90,7 @@ public class RedissonBloomFilterTest extends RedisDockerTest { @Test public void testNotInitializedOnExpectedInsertions() { - Assertions.assertThrows(IllegalStateException.class, () -> { + Assertions.assertThrows(RedisException.class, () -> { RBloomFilter filter = redisson.getBloomFilter("filter"); filter.getExpectedInsertions(); }); @@ -111,7 +112,7 @@ public class RedissonBloomFilterTest extends RedisDockerTest { @Test public void testNotInitializedOnContains() { - Assertions.assertThrows(IllegalStateException.class, () -> { + Assertions.assertThrows(RedisException.class, () -> { RBloomFilter filter = redisson.getBloomFilter("filter"); filter.contains("32"); }); @@ -119,7 +120,7 @@ public class RedissonBloomFilterTest extends RedisDockerTest { @Test public void testNotInitializedOnAdd() { - Assertions.assertThrows(IllegalStateException.class, () -> { + Assertions.assertThrows(RedisException.class, () -> { RBloomFilter filter = redisson.getBloomFilter("filter"); filter.add("123"); });