From dcab532b84530f5c64db10826bf8bab03d957568 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 30 Dec 2015 14:25:11 +0300 Subject: [PATCH] RBloomFilter added. #190 --- src/main/java/org/redisson/Redisson.java | 24 +- .../org/redisson/RedissonBloomFilter.java | 295 ++++++++++++++++++ .../java/org/redisson/RedissonClient.java | 5 + .../java/org/redisson/RedissonMapCache.java | 2 +- .../java/org/redisson/RedissonObject.java | 2 +- .../java/org/redisson/RedissonSetCache.java | 2 +- .../redisson/client/codec/DoubleCodec.java | 42 +++ .../redisson/client/codec/IntegerCodec.java | 40 +++ .../client/protocol/RedisCommands.java | 8 +- .../convertor/BitSetReplayConvertor.java | 26 ++ .../redisson/command/CommandBatchService.java | 1 - .../java/org/redisson/core/RBloomFilter.java | 60 ++++ .../reactive/RedissonBitSetReactive.java | 6 +- .../reactive/RedissonMapCacheReactive.java | 2 +- .../reactive/RedissonObjectReactive.java | 2 +- .../reactive/RedissonSetCacheReactive.java | 2 +- .../org/redisson/RedissonBloomFilterTest.java | 68 ++++ 17 files changed, 567 insertions(+), 20 deletions(-) create mode 100644 src/main/java/org/redisson/RedissonBloomFilter.java create mode 100644 src/main/java/org/redisson/client/codec/DoubleCodec.java create mode 100644 src/main/java/org/redisson/client/codec/IntegerCodec.java create mode 100644 src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java create mode 100644 src/main/java/org/redisson/core/RBloomFilter.java create mode 100644 src/test/java/org/redisson/RedissonBloomFilterTest.java diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 5854c91ea..425188542 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -43,6 +43,7 @@ import org.redisson.core.RBatch; import org.redisson.core.RBitSet; import org.redisson.core.RBlockingDeque; import org.redisson.core.RBlockingQueue; +import org.redisson.core.RBloomFilter; import org.redisson.core.RBucket; import org.redisson.core.RCountDownLatch; import org.redisson.core.RDeque; @@ -212,8 +213,7 @@ public class Redisson implements RedissonClient { } } - Future future = commandExecutor.writeAsync(null, RedisCommands.MSET, params.toArray()); - commandExecutor.get(future); + commandExecutor.write(null, RedisCommands.MSET, params.toArray()); } @Override @@ -396,6 +396,21 @@ public class Redisson implements RedissonClient { return new RedissonBitSet(commandExecutor, name); } + @Override + public RSemaphore getSemaphore(String name) { + return new RedissonSemaphore(commandExecutor, name, id); + } + + @Override + public RBloomFilter getBloomFilter(String name) { + return new RedissonBloomFilter(commandExecutor, name); + } + + @Override + public RBloomFilter getBloomFilter(String name, Codec codec) { + return new RedissonBloomFilter(codec, commandExecutor, name); + } + @Override public RKeys getKeys() { return new RedissonKeys(commandExecutor); @@ -447,10 +462,5 @@ public class Redisson implements RedissonClient { return connectionManager.isShuttingDown(); } - @Override - public RSemaphore getSemaphore(String name) { - return new RedissonSemaphore(commandExecutor, name, id); - } - } diff --git a/src/main/java/org/redisson/RedissonBloomFilter.java b/src/main/java/org/redisson/RedissonBloomFilter.java new file mode 100644 index 000000000..5913939d3 --- /dev/null +++ b/src/main/java/org/redisson/RedissonBloomFilter.java @@ -0,0 +1,295 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.redisson.client.RedisException; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.codec.IntegerCodec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.VoidReplayConvertor; +import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; +import org.redisson.command.CommandBatchService; +import org.redisson.command.CommandExecutor; +import org.redisson.core.RBloomFilter; + +import io.netty.util.concurrent.Future; +import net.openhft.hashing.LongHashFunction; + +/** + * Bloom filter based on 64-bit hash derived from 128-bit hash (xxHash 64-bit + FarmHash 64-bit). + * + * Code parts from Guava BloomFilter + * + * @author Nikita Koksharov + * + * @param + */ +public class RedissonBloomFilter extends RedissonExpirable implements RBloomFilter { + + private static final long MAX_SIZE = Integer.MAX_VALUE*2L; + + private volatile long size; + private volatile int hashIterations; + + private final CommandExecutor commandExecutor; + + protected RedissonBloomFilter(CommandExecutor commandExecutor, String name) { + super(commandExecutor, name); + this.commandExecutor = commandExecutor; + } + + protected RedissonBloomFilter(Codec codec, CommandExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + this.commandExecutor = commandExecutor; + } + + private int optimalNumOfHashFunctions(long n, long m) { + return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); + } + + private long optimalNumOfBits(long n, double p) { + if (p == 0) { + p = Double.MIN_VALUE; + } + return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); + } + + @Override + public boolean add(T object) { + byte[] state = encode(object); + + while (true) { + if (size == 0) { + readConfig(); + } + + int hashIterations = this.hashIterations; + long size = this.size; + + long[] indexes = hash(state, hashIterations, size); + + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); + addConfigCheck(hashIterations, size, executorService); + for (int i = 0; i < indexes.length; i++) { + executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), indexes[i], 1); + } + try { + List result = (List) executorService.execute(); + + for (Boolean val : result.subList(1, result.size()-1)) { + if (val) { + return true; + } + } + return false; + } catch (RedisException e) { + if (!e.getMessage().contains("Bloom filter config has been changed")) { + throw e; + } + } + } + } + + private long[] hash(byte[] state, int iterations, long size) { + long hash1 = LongHashFunction.xx_r39().hashBytes(state); + long hash2 = LongHashFunction.farmUo().hashBytes(state); + + long[] indexes = new long[iterations]; + long hash = hash1; + for (int i = 0; i < iterations; i++) { + indexes[i] = (hash & Long.MAX_VALUE) % size; + if (i % 2 == 0) { + hash += hash2; + } else { + hash += hash1; + } + } + return indexes; + } + + @Override + public boolean contains(T object) { + byte[] state = encode(object); + + while (true) { + if (size == 0) { + readConfig(); + } + + int hashIterations = this.hashIterations; + long size = this.size; + + long[] indexes = hash(state, hashIterations, size); + + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); + addConfigCheck(hashIterations, size, executorService); + for (int i = 0; i < indexes.length; i++) { + executorService.readAsync(getName(), codec, RedisCommands.GETBIT, getName(), indexes[i]); + } + try { + List result = (List) executorService.execute(); + + for (Boolean val : result.subList(1, result.size()-1)) { + if (!val) { + return false; + } + } + + return true; + } catch (RedisException e) { + if (!e.getMessage().contains("Bloom filter config has been changed")) { + throw e; + } + } + } + } + + private byte[] encode(T object) { + byte[] state = null; + try { + state = codec.getValueEncoder().encode(object); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + return state; + } + + private void addConfigCheck(int hashIterations, long size, CommandBatchService executorService) { + executorService.evalReadAsync(getConfigName(), codec, RedisCommands.EVAL_VOID, + "local size = redis.call('hget', KEYS[1], 'size');" + + "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + + "assert(size == ARGV[1] and hashIterations == ARGV[2], 'Bloom filter config has been changed')", + Arrays.asList(getConfigName()), size, hashIterations); + } + + @Override + public int count() { + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); + Future> configFuture = executorService.readAsync(getConfigName(), StringCodec.INSTANCE, + new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), getConfigName()); + Future cardinalityFuture = executorService.readAsync(getName(), codec, RedisCommands.BITCOUNT, getName()); + executorService.execute(); + + readConfig(configFuture.getNow()); + + return (int) (-size / ((double) hashIterations) * Math.log(1 - cardinalityFuture.getNow() / ((double) size))); + } + + @Override + public Future deleteAsync() { + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getConfigName()); + } + + private void readConfig() { + Future> future = commandExecutor.readAsync(getConfigName(), StringCodec.INSTANCE, + new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), getConfigName()); + Map config = commandExecutor.get(future); + + readConfig(config); + } + + private void readConfig(Map config) { + if (config.get("hashIterations") == null + || config.get("size") == null) { + throw new IllegalStateException("Bloom filter is not initialized!"); + } + size = Long.valueOf(config.get("size")); + hashIterations = Integer.valueOf(config.get("hashIterations")); + } + + @Override + public boolean tryInit(long expectedInsertions, double falseProbability) { + try { + readConfig(); + return false; + } catch (IllegalStateException e) { + // skip + } + + size = optimalNumOfBits(expectedInsertions, falseProbability); + if (size > MAX_SIZE) { + throw new IllegalArgumentException("Bloom filter can't be greater than " + MAX_SIZE + ". But calculated size is " + size); + } + hashIterations = optimalNumOfHashFunctions(expectedInsertions, size); + + CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); + executorService.evalReadAsync(getConfigName(), codec, RedisCommands.EVAL_VOID, + "local size = redis.call('hget', KEYS[1], 'size');" + + "local hashIterations = redis.call('hget', KEYS[1], 'hashIterations');" + + "assert(size == false and hashIterations == false, 'Bloom filter config has been changed')", + Arrays.asList(getConfigName()), size, hashIterations); + executorService.writeAsync(getConfigName(), StringCodec.INSTANCE, + new RedisCommand("HMSET", new VoidReplayConvertor()), getConfigName(), + "size", size, "hashIterations", hashIterations, + "expectedInsertions", expectedInsertions, "falseProbability", BigDecimal.valueOf(falseProbability).toPlainString()); + try { + executorService.execute(); + } catch (RedisException e) { + if (!e.getMessage().contains("Bloom filter config has been changed")) { + throw e; + } + return false; + } + + return true; + } + + private String getConfigName() { + return "{" + getName() + "}" + "__config"; + } + + @Override + public long getExpectedInsertions() { + Long result = commandExecutor.read(getConfigName(), LongCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "expectedInsertions"); + return check(result); + } + + @Override + public double getFalseProbability() { + Double result = commandExecutor.read(getConfigName(), DoubleCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "falseProbability"); + return check(result); + } + + @Override + public long getSize() { + Long result = commandExecutor.read(getConfigName(), LongCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "size"); + return check(result); + } + + @Override + public int getHashIterations() { + Integer result = commandExecutor.read(getConfigName(), IntegerCodec.INSTANCE, RedisCommands.HGET, getConfigName(), "hashIterations"); + return check(result); + } + + private V check(V result) { + if (result == null) { + throw new IllegalStateException("Bloom filter is not initialized!"); + } + return result; + } + +} diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index aa16b5dd0..76fc98f18 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -28,6 +28,7 @@ import org.redisson.core.RBatch; import org.redisson.core.RBitSet; import org.redisson.core.RBlockingDeque; import org.redisson.core.RBlockingQueue; +import org.redisson.core.RBloomFilter; import org.redisson.core.RBucket; import org.redisson.core.RMapCache; import org.redisson.core.RCountDownLatch; @@ -465,6 +466,10 @@ public interface RedissonClient { */ RBitSet getBitSet(String name); + RBloomFilter getBloomFilter(String name); + + RBloomFilter getBloomFilter(String name, Codec codec); + /** * Returns script operations object * diff --git a/src/main/java/org/redisson/RedissonMapCache.java b/src/main/java/org/redisson/RedissonMapCache.java index c924dea4d..f9f74523e 100644 --- a/src/main/java/org/redisson/RedissonMapCache.java +++ b/src/main/java/org/redisson/RedissonMapCache.java @@ -376,7 +376,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override public Future deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_SINGLE, getName(), getTimeoutSetName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); } @Override diff --git a/src/main/java/org/redisson/RedissonObject.java b/src/main/java/org/redisson/RedissonObject.java index 845350e52..a49ffa2d1 100644 --- a/src/main/java/org/redisson/RedissonObject.java +++ b/src/main/java/org/redisson/RedissonObject.java @@ -109,7 +109,7 @@ abstract class RedissonObject implements RObject { @Override public Future deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_SINGLE, getName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName()); } @Override diff --git a/src/main/java/org/redisson/RedissonSetCache.java b/src/main/java/org/redisson/RedissonSetCache.java index 42d2180b5..f1ae75302 100644 --- a/src/main/java/org/redisson/RedissonSetCache.java +++ b/src/main/java/org/redisson/RedissonSetCache.java @@ -500,7 +500,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< @Override public Future deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_SINGLE, getName(), getTimeoutSetName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); } @Override diff --git a/src/main/java/org/redisson/client/codec/DoubleCodec.java b/src/main/java/org/redisson/client/codec/DoubleCodec.java new file mode 100644 index 000000000..63728fee3 --- /dev/null +++ b/src/main/java/org/redisson/client/codec/DoubleCodec.java @@ -0,0 +1,42 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.codec; + +import java.math.BigDecimal; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class DoubleCodec extends StringCodec { + + public static final DoubleCodec INSTANCE = new DoubleCodec(); + + public final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) { + return new BigDecimal(buf.toString(CharsetUtil.UTF_8)).doubleValue(); + } + }; + + @Override + public Decoder getValueDecoder() { + return decoder; + } + +} diff --git a/src/main/java/org/redisson/client/codec/IntegerCodec.java b/src/main/java/org/redisson/client/codec/IntegerCodec.java new file mode 100644 index 000000000..a9501b09e --- /dev/null +++ b/src/main/java/org/redisson/client/codec/IntegerCodec.java @@ -0,0 +1,40 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.codec; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class IntegerCodec extends StringCodec { + + public static final IntegerCodec INSTANCE = new IntegerCodec(); + + public final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) { + return Integer.valueOf(buf.toString(CharsetUtil.UTF_8)); + } + }; + + @Override + public Decoder getValueDecoder() { + return decoder; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 0ce054300..98ffc33ae 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Set; import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.convertor.BitSetReplayConvertor; import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor; @@ -57,9 +58,10 @@ public interface RedisCommands { RedisStrictCommand GETBIT = new RedisStrictCommand("GETBIT", new BooleanReplayConvertor()); RedisStrictCommand BITS_SIZE = new RedisStrictCommand("STRLEN", new BitsSizeReplayConvertor()); RedisStrictCommand STRLEN = new RedisStrictCommand("STRLEN", new IntegerReplayConvertor()); - RedisStrictCommand BITCOUNT = new RedisStrictCommand("BITCOUNT", new IntegerReplayConvertor()); + RedisStrictCommand BITCOUNT = new RedisStrictCommand("BITCOUNT"); RedisStrictCommand BITPOS = new RedisStrictCommand("BITPOS", new IntegerReplayConvertor()); - RedisStrictCommand SETBIT = new RedisStrictCommand("SETBIT", new VoidReplayConvertor()); + RedisStrictCommand SETBIT_VOID = new RedisStrictCommand("SETBIT", new VoidReplayConvertor()); + RedisStrictCommand SETBIT = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor()); RedisStrictCommand BITOP = new RedisStrictCommand("BITOP", new VoidReplayConvertor()); RedisStrictCommand ASKING = new RedisStrictCommand("ASKING", new VoidReplayConvertor()); @@ -189,7 +191,7 @@ public interface RedisCommands { RedisStrictCommand DEL = new RedisStrictCommand("DEL"); RedisStrictCommand DBSIZE = new RedisStrictCommand("DBSIZE"); - RedisStrictCommand DEL_SINGLE = new RedisStrictCommand("DEL", new BooleanReplayConvertor()); + RedisStrictCommand DEL_BOOL = new RedisStrictCommand("DEL", new BooleanReplayConvertor()); RedisStrictCommand DEL_VOID = new RedisStrictCommand("DEL", new VoidReplayConvertor()); RedisCommand GET = new RedisCommand("GET"); diff --git a/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java b/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java new file mode 100644 index 000000000..e943b819a --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java @@ -0,0 +1,26 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.convertor; + +public class BitSetReplayConvertor extends SingleConvertor { + + @Override + public Boolean convert(Object obj) { + return Long.valueOf(0).equals(obj); + } + + +} diff --git a/src/main/java/org/redisson/command/CommandBatchService.java b/src/main/java/org/redisson/command/CommandBatchService.java index f7632b1d6..47b5721c8 100644 --- a/src/main/java/org/redisson/command/CommandBatchService.java +++ b/src/main/java/org/redisson/command/CommandBatchService.java @@ -35,7 +35,6 @@ import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.connection.ConnectionManager; import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource.Redirect; diff --git a/src/main/java/org/redisson/core/RBloomFilter.java b/src/main/java/org/redisson/core/RBloomFilter.java new file mode 100644 index 000000000..dab4fd49c --- /dev/null +++ b/src/main/java/org/redisson/core/RBloomFilter.java @@ -0,0 +1,60 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +/** + * Bloom filter based on 64-bit hash derived from 128-bit hash (xxHash + FarmHash). + * + * Code parts from Guava BloomFilter + * + * @author Nikita Koksharov + * + * @param + */ +public interface RBloomFilter extends RExpirable { + + boolean add(T object); + + boolean contains(T object); + + /** + * Initializes Bloom filter params (size and hashIterations) + * calculated from expectedInsertions and falseProbability + * Stores config to Redis server. + * + * @param expectedInsertions + * @param falseProbability + * @return true if Bloom filter initialized + * false if Bloom filter already has been initialized + */ + boolean tryInit(long expectedInsertions, double falseProbability); + + long getExpectedInsertions(); + + double getFalseProbability(); + + long getSize(); + + int getHashIterations(); + + /** + * Calculates number of elements already added to Bloom filter. + * + * @return + */ + int count(); + +} diff --git a/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java b/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java index 661239d25..2bf7ec879 100644 --- a/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java @@ -42,7 +42,7 @@ public class RedissonBitSetReactive extends RedissonExpirableReactive implements } public Publisher set(int bitIndex, boolean value) { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.SETBIT, getName(), bitIndex, value ? 1 : 0); + return commandExecutor.writeReactive(getName(), codec, RedisCommands.SETBIT_VOID, getName(), bitIndex, value ? 1 : 0); } public Publisher toByteArray() { @@ -100,7 +100,7 @@ public class RedissonBitSetReactive extends RedissonExpirableReactive implements public Publisher clear(int fromIndex, int toIndex) { CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); for (int i = fromIndex; i < toIndex; i++) { - executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0); + executorService.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), i, 0); } return new NettyFuturePublisher(executorService.executeAsyncVoid()); } @@ -119,7 +119,7 @@ public class RedissonBitSetReactive extends RedissonExpirableReactive implements public Publisher set(int fromIndex, int toIndex) { CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); for (int i = fromIndex; i < toIndex; i++) { - executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1); + executorService.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), i, 1); } return new NettyFuturePublisher(executorService.executeAsyncVoid()); } diff --git a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 76c4bdd68..cb0798434 100644 --- a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -355,7 +355,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im @Override public Publisher delete() { - return commandExecutor.writeReactive(getName(), RedisCommands.DEL_SINGLE, getName(), getTimeoutSetName()); + return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 3a2ab55eb..cbe42e6f9 100644 --- a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -77,7 +77,7 @@ abstract class RedissonObjectReactive implements RObjectReactive { @Override public Publisher delete() { - return commandExecutor.writeReactive(getName(), RedisCommands.DEL_SINGLE, getName()); + return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName()); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 2c313bc1d..b9d57690b 100644 --- a/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -357,7 +357,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple @Override public Publisher delete() { - return commandExecutor.writeReactive(getName(), RedisCommands.DEL_SINGLE, getName(), getTimeoutSetName()); + return commandExecutor.writeReactive(getName(), RedisCommands.DEL_BOOL, getName(), getTimeoutSetName()); } @Override diff --git a/src/test/java/org/redisson/RedissonBloomFilterTest.java b/src/test/java/org/redisson/RedissonBloomFilterTest.java new file mode 100644 index 000000000..8226ed215 --- /dev/null +++ b/src/test/java/org/redisson/RedissonBloomFilterTest.java @@ -0,0 +1,68 @@ +package org.redisson; + +import org.junit.Test; +import org.redisson.core.RBloomFilter; +import static org.assertj.core.api.Assertions.*; + +public class RedissonBloomFilterTest extends BaseTest { + + @Test + public void testConfig() { + RBloomFilter filter = redisson.getBloomFilter("filter"); + filter.tryInit(100, 0.03); + assertThat(filter.getExpectedInsertions()).isEqualTo(100); + assertThat(filter.getFalseProbability()).isEqualTo(0.03); + assertThat(filter.getHashIterations()).isEqualTo(5); + assertThat(filter.getSize()).isEqualTo(729); + } + + @Test + public void testInit() { + RBloomFilter filter = redisson.getBloomFilter("filter"); + assertThat(filter.tryInit(55000000L, 0.03)).isTrue(); + assertThat(filter.tryInit(55000001L, 0.03)).isFalse(); + + filter.delete(); + + assertThat(filter.tryInit(55000001L, 0.03)).isTrue(); + } + + @Test(expected = IllegalStateException.class) + public void testNotInitializedOnExpectedInsertions() { + RBloomFilter filter = redisson.getBloomFilter("filter"); + + filter.getExpectedInsertions(); + } + + @Test(expected = IllegalStateException.class) + public void testNotInitializedOnContains() { + RBloomFilter filter = redisson.getBloomFilter("filter"); + + filter.contains("32"); + } + + @Test(expected = IllegalStateException.class) + public void testNotInitializedOnAdd() { + RBloomFilter filter = redisson.getBloomFilter("filter"); + + filter.add("123"); + } + + @Test + public void test() { + RBloomFilter filter = redisson.getBloomFilter("filter"); + filter.tryInit(550000000L, 0.03); + + assertThat(filter.contains("123")).isFalse(); + assertThat(filter.add("123")).isTrue(); + assertThat(filter.contains("123")).isTrue(); + assertThat(filter.add("123")).isFalse(); + assertThat(filter.count()).isEqualTo(1); + + assertThat(filter.contains("hflgs;jl;ao1-32471320o31803-24")).isFalse(); + assertThat(filter.add("hflgs;jl;ao1-32471320o31803-24")).isTrue(); + assertThat(filter.contains("hflgs;jl;ao1-32471320o31803-24")).isTrue(); + assertThat(filter.count()).isEqualTo(2); + } + +}