Feature - Async, Rx, Reactive interfaces implemented for RBloomFilter object. #5836

pull/5863/head
Nikita Koksharov 9 months ago
parent 04b206cbc5
commit ce70bf3945

@ -29,22 +29,21 @@
package org.redisson; package org.redisson;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.redisson.api.RBitSetAsync;
import org.redisson.api.RBloomFilter; import org.redisson.api.RBloomFilter;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisException; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.*; import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash; import org.redisson.misc.Hash;
import java.math.BigDecimal;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -102,21 +101,29 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
} }
@Override @Override
public long add(Collection<T> objects) { public RFuture<Boolean> addAsync(T object) {
CompletionStage<Boolean> f = addAsync(Arrays.asList(object)).thenApply(r -> r > 0);
return new CompletableFutureWrapper<>(f);
}
@Override
public RFuture<Long> addAsync(Collection<T> objects) {
CompletionStage<Void> future = CompletableFuture.completedFuture(null);
if (size == 0) { if (size == 0) {
readConfig(); future = readConfigAsync();
} }
List<Long> allIndexes = index(objects); CompletionStage<Long> f = future.thenCompose(r -> {
List<Long> allIndexes = index(objects);
List<Object> params = new ArrayList<>(); List<Object> params = new ArrayList<>();
params.add(size); params.add(size);
params.add(hashIterations); params.add(hashIterations);
int s = allIndexes.size() / objects.size(); int s = allIndexes.size() / objects.size();
params.add(s); params.add(s);
params.addAll(allIndexes); params.addAll(allIndexes);
return get(commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG, return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local size = redis.call('hget', KEYS[1], 'size');" + "local size = redis.call('hget', KEYS[1], 'size');" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" + "assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" +
@ -137,7 +144,15 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
"end; " + "end; " +
"return c;", "return c;",
Arrays.asList(configName, getRawName()), Arrays.asList(configName, getRawName()),
params.toArray())); params.toArray());
});
return new CompletableFutureWrapper<>(f);
}
@Override
public long add(Collection<T> objects) {
return get(addAsync(objects));
} }
private long[] hash(long hash1, long hash2, int iterations, long size) { private long[] hash(long hash1, long hash2, int iterations, long size) {
@ -155,42 +170,51 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
} }
@Override @Override
public long contains(Collection<T> objects) { public RFuture<Long> containsAsync(Collection<T> objects) {
CompletionStage<Void> future = CompletableFuture.completedFuture(null);
if (size == 0) { if (size == 0) {
readConfig(); future = readConfigAsync();
} }
List<Long> allIndexes = index(objects); CompletionStage<Long> f = future.thenCompose(r -> {
List<Long> allIndexes = index(objects);
List<Object> params = new ArrayList<>();
params.add(size); List<Object> params = new ArrayList<>();
params.add(hashIterations); params.add(size);
params.add(objects.size()); params.add(hashIterations);
params.addAll(allIndexes); params.add(objects.size());
params.addAll(allIndexes);
return get(commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG,
"local size = redis.call('hget', KEYS[1], 'size');" + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + "local size = redis.call('hget', KEYS[1], 'size');" +
"assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" + "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" +
"assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')" +
"local k = 0;" +
"local c = 0;" + "local k = 0;" +
"local cc = (#ARGV - 3) / ARGV[3];" + "local c = 0;" +
"for i = 4, #ARGV, 1 do " + "local cc = (#ARGV - 3) / ARGV[3];" +
"local r = redis.call('getbit', KEYS[2], ARGV[i]); " + "for i = 4, #ARGV, 1 do " +
"if r == 0 then " + "local r = redis.call('getbit', KEYS[2], ARGV[i]); " +
"k = k + 1;" + "if r == 0 then " +
"end; " + "k = k + 1;" +
"if ((i - 4) + 1) % cc == 0 then " + "end; " +
"if k > 0 then " + "if ((i - 4) + 1) % cc == 0 then " +
"c = c + 1;" + "if k > 0 then " +
"c = c + 1;" +
"end; " +
"k = 0; " +
"end; " +
"end; " + "end; " +
"k = 0; " + "return ARGV[3] - c;",
"end; " + Arrays.asList(configName, getRawName()),
"end; " + params.toArray());
"return ARGV[3] - c;", });
Arrays.asList(configName, getRawName()), return new CompletableFutureWrapper<>(f);
params.toArray())); }
@Override
public long contains(Collection<T> objects) {
return get(containsAsync(objects));
} }
private List<Long> index(Collection<T> objects) { private List<Long> index(Collection<T> objects) {
@ -208,22 +232,27 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
return contains(Arrays.asList(object)) > 0; return contains(Arrays.asList(object)) > 0;
} }
protected RBitSetAsync createBitSet(CommandBatchService executorService) { @Override
return new RedissonBitSet(executorService, getName()); public RFuture<Boolean> containsAsync(T object) {
CompletionStage<Boolean> f = containsAsync(Arrays.asList(object)).thenApply(r -> r > 0);
return new CompletableFutureWrapper<>(f);
} }
@Override @Override
public long count() { public long count() {
CommandBatchService executorService = new CommandBatchService(commandExecutor); return get(countAsync());
RFuture<Map<String, String>> configFuture = executorService.readAsync(configName, StringCodec.INSTANCE, }
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), configName);
RBitSetAsync bs = createBitSet(executorService);
RFuture<Long> cardinalityFuture = bs.cardinalityAsync();
executorService.execute();
readConfig(commandExecutor.getNow(configFuture.toCompletableFuture()));
return Math.round(-size / ((double) hashIterations) * Math.log(1 - commandExecutor.getNow(cardinalityFuture.toCompletableFuture()) / ((double) size))); @Override
public RFuture<Long> countAsync() {
CompletionStage<Void> f = readConfigAsync();
CompletionStage<Long> res = f.thenCompose(r -> {
RedissonBitSet bs = new RedissonBitSet(commandExecutor, getName());
return bs.cardinalityAsync().thenApply(c -> {
return Math.round(-size / ((double) hashIterations) * Math.log(1 - c / ((double) size)));
});
});
return new CompletableFutureWrapper<>(res);
} }
@Override @Override
@ -237,12 +266,12 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
return super.sizeInMemoryAsync(keys); return super.sizeInMemoryAsync(keys);
} }
private void readConfig() { private CompletionStage<Void> readConfigAsync() {
RFuture<Map<String, String>> future = commandExecutor.readAsync(configName, StringCodec.INSTANCE, RFuture<Map<String, String>> future = commandExecutor.readAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), configName); new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder()), configName);
Map<String, String> config = commandExecutor.get(future); return future.thenAccept(config -> {
readConfig(config);
readConfig(config); });
} }
private void readConfig(Map<String, String> config) { private void readConfig(Map<String, String> config) {
@ -260,6 +289,11 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override @Override
public boolean tryInit(long expectedInsertions, double falseProbability) { public boolean tryInit(long expectedInsertions, double falseProbability) {
return get(tryInitAsync(expectedInsertions, falseProbability));
}
@Override
public RFuture<Boolean> tryInitAsync(long expectedInsertions, double falseProbability) {
if (falseProbability > 1) { if (falseProbability > 1) {
throw new IllegalArgumentException("Bloom filter false probability can't be greater than 1"); throw new IllegalArgumentException("Bloom filter false probability can't be greater than 1");
} }
@ -276,29 +310,21 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
} }
hashIterations = optimalNumOfHashFunctions(expectedInsertions, size); hashIterations = optimalNumOfHashFunctions(expectedInsertions, size);
CommandBatchService executorService = new CommandBatchService(commandExecutor); return commandExecutor.evalWriteAsync(configName, StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
executorService.evalReadAsync(configName, codec, RedisCommands.EVAL_VOID, "if redis.call('exists', KEYS[1]) == 1 then " +
"local size = redis.call('hget', KEYS[1], 'size');" + "return 0;" +
"local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + "end; " +
"assert(size == false and hashIterations == false, 'Bloom filter config has been changed')",
Arrays.<Object>asList(configName), size, hashIterations);
executorService.writeAsync(configName, StringCodec.INSTANCE,
new RedisCommand<Void>("HMSET", new VoidReplayConvertor()), configName,
"size", size, "hashIterations", hashIterations,
"expectedInsertions", expectedInsertions, "falseProbability", BigDecimal.valueOf(falseProbability).toPlainString());
try {
executorService.execute();
} catch (RedisException e) {
if (e.getMessage() == null || !e.getMessage().contains("Bloom filter config has been changed")) {
throw e;
}
readConfig();
return false;
}
return true; "redis.call('hset', KEYS[1], 'size', ARGV[1]);" +
"redis.call('hset', KEYS[1], 'hashIterations', ARGV[2]);" +
"redis.call('hset', KEYS[1], 'expectedInsertions', ARGV[3]);" +
"redis.call('hset', KEYS[1], 'falseProbability', ARGV[4]);" +
"return 1;",
Arrays.asList(configName),
size, hashIterations, expectedInsertions, falseProbability);
} }
@Override @Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) { public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit, String param, String... keys) {
return super.expireAsync(timeToLive, timeUnit, param, getRawName(), configName); return super.expireAsync(timeToLive, timeUnit, param, getRawName(), configName);
@ -316,26 +342,53 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
@Override @Override
public long getExpectedInsertions() { public long getExpectedInsertions() {
Long result = get(commandExecutor.readAsync(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "expectedInsertions")); return get(getExpectedInsertionsAsync());
return check(result); }
@Override
public RFuture<Long> getExpectedInsertionsAsync() {
return readSettingAsync(RedisCommands.EVAL_LONG, LongCodec.INSTANCE, "expectedInsertions");
}
private <T> RFuture<T> readSettingAsync(RedisCommand<T> evalCommandType, Codec codec, String settingName) {
return commandExecutor.evalReadAsync(configName, codec, evalCommandType,
"if redis.call('exists', KEYS[1]) == 0 then " +
"assert(false, 'Bloom filter is not initialized')" +
"end; " +
"return redis.call('hget', KEYS[1], ARGV[1]);",
Arrays.asList(configName),
settingName);
} }
@Override @Override
public double getFalseProbability() { public double getFalseProbability() {
Double result = get(commandExecutor.readAsync(configName, DoubleCodec.INSTANCE, RedisCommands.HGET, configName, "falseProbability")); return get(getFalseProbabilityAsync());
return check(result); }
@Override
public RFuture<Double> getFalseProbabilityAsync() {
return readSettingAsync(RedisCommands.EVAL_DOUBLE, DoubleCodec.INSTANCE, "falseProbability");
} }
@Override @Override
public long getSize() { public long getSize() {
Long result = get(commandExecutor.readAsync(configName, LongCodec.INSTANCE, RedisCommands.HGET, configName, "size")); return get(getSizeAsync());
return check(result); }
@Override
public RFuture<Long> getSizeAsync() {
return readSettingAsync(RedisCommands.EVAL_LONG, LongCodec.INSTANCE, "size");
} }
@Override @Override
public int getHashIterations() { public int getHashIterations() {
Integer result = get(commandExecutor.readAsync(configName, IntegerCodec.INSTANCE, RedisCommands.HGET, configName, "hashIterations")); return get(getHashIterationsAsync());
return check(result); }
@Override
public RFuture<Integer> getHashIterationsAsync() {
return readSettingAsync(RedisCommands.EVAL_INTEGER, LongCodec.INSTANCE, "hashIterations");
} }
@Override @Override
@ -381,11 +434,4 @@ public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomF
return new CompletableFutureWrapper<>(f); return new CompletableFutureWrapper<>(f);
} }
private <V> V check(V result) {
if (result == null) {
throw new IllegalStateException("Bloom filter is not initialized!");
}
return result;
}
} }

@ -870,6 +870,24 @@ public class RedissonReactive implements RedissonReactiveClient {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(ca, params.getName()), RBitSetReactive.class); return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(ca, params.getName()), RBitSetReactive.class);
} }
@Override
public <V> RBloomFilterReactive<V> getBloomFilter(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(commandExecutor, name), RBloomFilterReactive.class);
}
@Override
public <V> RBloomFilterReactive<V> getBloomFilter(String name, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(codec, commandExecutor, name), RBloomFilterReactive.class);
}
@Override
public <V> RBloomFilterReactive<V> getBloomFilter(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandReactiveExecutor ca = commandExecutor.copy(params);
return ReactiveProxyBuilder.create(commandExecutor,
new RedissonBloomFilter<V>(params.getCodec(), ca, params.getName()), RBloomFilterReactive.class);
}
@Override @Override
public RFunctionReactive getFunction() { public RFunctionReactive getFunction() {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor), RFunctionReactive.class); return ReactiveProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor), RFunctionReactive.class);

@ -873,6 +873,24 @@ public class RedissonRx implements RedissonRxClient {
return RxProxyBuilder.create(commandExecutor, new RedissonBitSet(ce, params.getName()), RBitSetRx.class); return RxProxyBuilder.create(commandExecutor, new RedissonBitSet(ce, params.getName()), RBitSetRx.class);
} }
@Override
public <V> RBloomFilterRx<V> getBloomFilter(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(commandExecutor, name), RBloomFilterRx.class);
}
@Override
public <V> RBloomFilterRx<V> getBloomFilter(String name, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonBloomFilter<>(codec, commandExecutor, name), RBloomFilterRx.class);
}
@Override
public <V> RBloomFilterRx<V> getBloomFilter(PlainOptions options) {
PlainParams params = (PlainParams) options;
CommandRxExecutor ce = commandExecutor.copy(params);
return RxProxyBuilder.create(commandExecutor,
new RedissonBloomFilter<V>(params.getCodec(), ce, params.getName()), RBloomFilterRx.class);
}
@Override @Override
public RFunctionRx getFunction() { public RFunctionRx getFunction() {
return RxProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor), RFunctionRx.class); return RxProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor), RFunctionRx.class);

@ -24,7 +24,7 @@ import java.util.Collection;
* *
* @param <T> - type of object * @param <T> - type of object
*/ */
public interface RBloomFilter<T> extends RExpirable { public interface RBloomFilter<T> extends RExpirable, RBloomFilterAsync<T> {
/** /**
* Adds element * Adds element

@ -0,0 +1,113 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
/**
* Distributed implementation of Bloom filter based on Highway 128-bit hash.
*
* @author Nikita Koksharov
*
* @param <T> - type of object
*/
public interface RBloomFilterAsync<T> extends RExpirableAsync {
/**
* Adds element
*
* @param object - element to add
* @return <code>true</code> if element has been added successfully
* <code>false</code> if element is already present
*/
RFuture<Boolean> addAsync(T object);
/**
* Adds elements
*
* @param elements elements to add
* @return number of added elements
*/
RFuture<Long> addAsync(Collection<T> elements);
/**
* Checks for element presence
*
* @param object element
* @return <code>true</code> if element is present
* <code>false</code> if element is not present
*/
RFuture<Boolean> containsAsync(T object);
/**
* Checks for elements presence
*
* @param elements elements to check presence
* @return number of elements present
*/
RFuture<Long> containsAsync(Collection<T> elements);
/**
* Initializes Bloom filter params (size and hashIterations)
* calculated from <code>expectedInsertions</code> and <code>falseProbability</code>
* Stores config to Redis server.
*
* @param expectedInsertions - expected amount of insertions per element
* @param falseProbability - expected false probability
* @return <code>true</code> if Bloom filter initialized
* <code>false</code> if Bloom filter already has been initialized
*/
RFuture<Boolean> tryInitAsync(long expectedInsertions, double falseProbability);
/**
* Returns expected amount of insertions per element.
* Calculated during bloom filter initialization.
*
* @return expected amount of insertions per element
*/
RFuture<Long> getExpectedInsertionsAsync();
/**
* Returns false probability of element presence.
* Calculated during bloom filter initialization.
*
* @return false probability of element presence
*/
RFuture<Double> getFalseProbabilityAsync();
/**
* Returns number of bits in Redis memory required by this instance
*
* @return number of bits
*/
RFuture<Long> getSizeAsync();
/**
* Returns hash iterations amount used per element.
* Calculated during bloom filter initialization.
*
* @return hash iterations amount
*/
RFuture<Integer> getHashIterationsAsync();
/**
* Calculates probabilistic number of elements already added to Bloom filter.
*
* @return probabilistic number of elements
*/
RFuture<Long> countAsync();
}

@ -0,0 +1,115 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import reactor.core.publisher.Mono;
import java.util.Collection;
/**
* Distributed implementation of Bloom filter based on Highway 128-bit hash.
*
* @author Nikita Koksharov
*
* @param <T> - type of object
*/
public interface RBloomFilterReactive<T> extends RExpirableReactive {
/**
* Adds element
*
* @param object - element to add
* @return <code>true</code> if element has been added successfully
* <code>false</code> if element is already present
*/
Mono<Boolean> add(T object);
/**
* Adds elements
*
* @param elements elements to add
* @return number of added elements
*/
Mono<Long> add(Collection<T> elements);
/**
* Checks for element presence
*
* @param object element
* @return <code>true</code> if element is present
* <code>false</code> if element is not present
*/
Mono<Boolean> contains(T object);
/**
* Checks for elements presence
*
* @param elements elements to check presence
* @return number of elements present
*/
Mono<Long> contains(Collection<T> elements);
/**
* Initializes Bloom filter params (size and hashIterations)
* calculated from <code>expectedInsertions</code> and <code>falseProbability</code>
* Stores config to Redis server.
*
* @param expectedInsertions - expected amount of insertions per element
* @param falseProbability - expected false probability
* @return <code>true</code> if Bloom filter initialized
* <code>false</code> if Bloom filter already has been initialized
*/
Mono<Boolean> tryInit(long expectedInsertions, double falseProbability);
/**
* Returns expected amount of insertions per element.
* Calculated during bloom filter initialization.
*
* @return expected amount of insertions per element
*/
Mono<Long> getExpectedInsertions();
/**
* Returns false probability of element presence.
* Calculated during bloom filter initialization.
*
* @return false probability of element presence
*/
Mono<Double> getFalseProbability();
/**
* Returns number of bits in Redis memory required by this instance
*
* @return number of bits
*/
Mono<Long> getSize();
/**
* Returns hash iterations amount used per element.
* Calculated during bloom filter initialization.
*
* @return hash iterations amount
*/
Mono<Integer> getHashIterations();
/**
* Calculates probabilistic number of elements already added to Bloom filter.
*
* @return probabilistic number of elements
*/
Mono<Long> count();
}

@ -0,0 +1,115 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import io.reactivex.rxjava3.core.Single;
import java.util.Collection;
/**
* Distributed implementation of Bloom filter based on Highway 128-bit hash.
*
* @author Nikita Koksharov
*
* @param <T> - type of object
*/
public interface RBloomFilterRx<T> extends RExpirableRx {
/**
* Adds element
*
* @param object - element to add
* @return <code>true</code> if element has been added successfully
* <code>false</code> if element is already present
*/
Single<Boolean> add(T object);
/**
* Adds elements
*
* @param elements elements to add
* @return number of added elements
*/
Single<Long> add(Collection<T> elements);
/**
* Checks for element presence
*
* @param object element
* @return <code>true</code> if element is present
* <code>false</code> if element is not present
*/
Single<Boolean> contains(T object);
/**
* Checks for elements presence
*
* @param elements elements to check presence
* @return number of elements present
*/
Single<Long> contains(Collection<T> elements);
/**
* Initializes Bloom filter params (size and hashIterations)
* calculated from <code>expectedInsertions</code> and <code>falseProbability</code>
* Stores config to Redis server.
*
* @param expectedInsertions - expected amount of insertions per element
* @param falseProbability - expected false probability
* @return <code>true</code> if Bloom filter initialized
* <code>false</code> if Bloom filter already has been initialized
*/
Single<Boolean> tryInit(long expectedInsertions, double falseProbability);
/**
* Returns expected amount of insertions per element.
* Calculated during bloom filter initialization.
*
* @return expected amount of insertions per element
*/
Single<Long> getExpectedInsertions();
/**
* Returns false probability of element presence.
* Calculated during bloom filter initialization.
*
* @return false probability of element presence
*/
Single<Double> getFalseProbability();
/**
* Returns number of bits in Redis memory required by this instance
*
* @return number of bits
*/
Single<Long> getSize();
/**
* Returns hash iterations amount used per element.
* Calculated during bloom filter initialization.
*
* @return hash iterations amount
*/
Single<Integer> getHashIterations();
/**
* Calculates probabilistic number of elements already added to Bloom filter.
*
* @return probabilistic number of elements
*/
Single<Long> count();
}

@ -1361,6 +1361,35 @@ public interface RedissonReactiveClient {
*/ */
RBitSetReactive getBitSet(CommonOptions options); RBitSetReactive getBitSet(CommonOptions options);
/**
* Returns bloom filter instance by name.
*
* @param <V> type of value
* @param name name of object
* @return BloomFilter object
*/
<V> RBloomFilterReactive<V> getBloomFilter(String name);
/**
* Returns bloom filter instance by name
* using provided codec for objects.
*
* @param <V> type of value
* @param name name of object
* @param codec codec for values
* @return BloomFilter object
*/
<V> RBloomFilterReactive<V> getBloomFilter(String name, Codec codec);
/**
* Returns bloom filter instance with specified <code>options</code>.
*
* @param <V> type of value
* @param options instance options
* @return BloomFilter object
*/
<V> RBloomFilterReactive<V> getBloomFilter(PlainOptions options);
/** /**
* Returns interface for Redis Function feature * Returns interface for Redis Function feature
* *

@ -1350,6 +1350,35 @@ public interface RedissonRxClient {
*/ */
RBitSetRx getBitSet(CommonOptions options); RBitSetRx getBitSet(CommonOptions options);
/**
* Returns bloom filter instance by name.
*
* @param <V> type of value
* @param name name of object
* @return BloomFilter object
*/
<V> RBloomFilterRx<V> getBloomFilter(String name);
/**
* Returns bloom filter instance by name
* using provided codec for objects.
*
* @param <V> type of value
* @param name name of object
* @param codec codec for values
* @return BloomFilter object
*/
<V> RBloomFilterRx<V> getBloomFilter(String name, Codec codec);
/**
* Returns bloom filter instance with specified <code>options</code>.
*
* @param <V> type of value
* @param options instance options
* @return BloomFilter object
*/
<V> RBloomFilterRx<V> getBloomFilter(PlainOptions options);
/** /**
* Returns interface for Redis Function feature * Returns interface for Redis Function feature
* *

@ -3,6 +3,7 @@ package org.redisson;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.RBloomFilter; import org.redisson.api.RBloomFilter;
import org.redisson.client.RedisException;
import java.time.Instant; import java.time.Instant;
import java.util.Arrays; import java.util.Arrays;
@ -89,7 +90,7 @@ public class RedissonBloomFilterTest extends RedisDockerTest {
@Test @Test
public void testNotInitializedOnExpectedInsertions() { public void testNotInitializedOnExpectedInsertions() {
Assertions.assertThrows(IllegalStateException.class, () -> { Assertions.assertThrows(RedisException.class, () -> {
RBloomFilter<String> filter = redisson.getBloomFilter("filter"); RBloomFilter<String> filter = redisson.getBloomFilter("filter");
filter.getExpectedInsertions(); filter.getExpectedInsertions();
}); });
@ -111,7 +112,7 @@ public class RedissonBloomFilterTest extends RedisDockerTest {
@Test @Test
public void testNotInitializedOnContains() { public void testNotInitializedOnContains() {
Assertions.assertThrows(IllegalStateException.class, () -> { Assertions.assertThrows(RedisException.class, () -> {
RBloomFilter<String> filter = redisson.getBloomFilter("filter"); RBloomFilter<String> filter = redisson.getBloomFilter("filter");
filter.contains("32"); filter.contains("32");
}); });
@ -119,7 +120,7 @@ public class RedissonBloomFilterTest extends RedisDockerTest {
@Test @Test
public void testNotInitializedOnAdd() { public void testNotInitializedOnAdd() {
Assertions.assertThrows(IllegalStateException.class, () -> { Assertions.assertThrows(RedisException.class, () -> {
RBloomFilter<String> filter = redisson.getBloomFilter("filter"); RBloomFilter<String> filter = redisson.getBloomFilter("filter");
filter.add("123"); filter.add("123");
}); });

Loading…
Cancel
Save