diff --git a/src/main/java/org/redisson/RedissonBitSetReactive.java b/src/main/java/org/redisson/RedissonBitSetReactive.java new file mode 100644 index 000000000..d2cae8cd0 --- /dev/null +++ b/src/main/java/org/redisson/RedissonBitSetReactive.java @@ -0,0 +1,172 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.BitSetCodec; +import org.redisson.client.codec.ByteArrayCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandBatchAsyncService; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.core.RBitSetReactive; + +import reactor.rx.Streams; + +public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive { + + protected RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) { + super(connectionManager, name); + } + + public Publisher get(int bitIndex) { + return commandExecutor.readObservable(getName(), codec, RedisCommands.GETBIT, getName(), bitIndex); + } + + public Publisher set(int bitIndex, boolean value) { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.SETBIT, getName(), bitIndex, value ? 1 : 0); + } + + public Publisher toByteArray() { + return commandExecutor.readObservable(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName()); + } + + private Publisher op(String op, String... bitSetNames) { + List params = new ArrayList(bitSetNames.length + 3); + params.add(op); + params.add(getName()); + params.add(getName()); + params.addAll(Arrays.asList(bitSetNames)); + return commandExecutor.writeObservable(getName(), codec, RedisCommands.BITOP, params.toArray()); + } + + public Publisher asBitSet() { + return commandExecutor.readObservable(getName(), BitSetCodec.INSTANCE, RedisCommands.GET, getName()); + } + + //Copied from: https://github.com/xetorthio/jedis/issues/301 + private static byte[] toByteArrayReverse(BitSet bits) { + byte[] bytes = new byte[bits.length() / 8 + 1]; + for (int i = 0; i < bits.length(); i++) { + if (bits.get(i)) { + final int value = bytes[i / 8] | (1 << (7 - (i % 8))); + bytes[i / 8] = (byte) value; + } + } + return bytes; + } + + @Override + public Publisher length() { + return commandExecutor.evalReadObservable(getName(), codec, RedisCommands.EVAL_INTEGER, + "local fromBit = redis.call('bitpos', KEYS[1], 1, -1);" + + "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;" + + "for i = toBit, fromBit, -1 do " + + "if redis.call('getbit', KEYS[1], i) == 1 then " + + "return i+1;" + + "end;" + + "end;" + + "return fromBit+1", + Collections.singletonList(getName())); + } + + @Override + public Publisher set(int fromIndex, int toIndex, boolean value) { + if (value) { + return set(fromIndex, toIndex); + } + return clear(fromIndex, toIndex); + } + + @Override + public Publisher clear(int fromIndex, int toIndex) { + CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager()); + for (int i = fromIndex; i < toIndex; i++) { + executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0); + } + return new NettyFuturePublisher(executorService.executeAsyncVoid()); + } + + @Override + public Publisher set(BitSet bs) { + return commandExecutor.writeObservable(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs)); + } + + @Override + public Publisher not() { + return op("NOT"); + } + + @Override + public Publisher set(int fromIndex, int toIndex) { + CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager()); + for (int i = fromIndex; i < toIndex; i++) { + executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1); + } + return new NettyFuturePublisher(executorService.executeAsyncVoid()); + } + + @Override + public Publisher size() { + return commandExecutor.readObservable(getName(), codec, RedisCommands.BITS_SIZE, getName()); + } + + @Override + public Publisher set(int bitIndex) { + return set(bitIndex, true); + } + + @Override + public Publisher cardinality() { + return commandExecutor.readObservable(getName(), codec, RedisCommands.BITCOUNT, getName()); + } + + @Override + public Publisher clear(int bitIndex) { + return set(bitIndex, false); + } + + @Override + public Publisher clear() { + return commandExecutor.writeObservable(getName(), RedisCommands.DEL_VOID, getName()); + } + + @Override + public Publisher or(String... bitSetNames) { + return op("OR", bitSetNames); + } + + @Override + public Publisher and(String... bitSetNames) { + return op("AND", bitSetNames); + } + + @Override + public Publisher xor(String... bitSetNames) { + return op("XOR", bitSetNames); + } + + @Override + public String toString() { + return Streams.create(asBitSet()).next().poll().toString(); + } + +} diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index e0cbda282..4c5547e23 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -30,6 +30,7 @@ import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SingleConnectionManager; import org.redisson.core.RAtomicLongReactive; +import org.redisson.core.RBitSetReactive; import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; import org.redisson.core.RDequeReactive; @@ -220,6 +221,11 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonAtomicLongReactive(commandExecutor, name); } + @Override + public RBitSetReactive getBitSet(String name) { + return new RedissonBitSetReactive(commandExecutor, name); + } + public Config getConfig() { return config; } @@ -238,6 +244,5 @@ public class RedissonReactive implements RedissonReactiveClient { public void flushall() { commandExecutor.get(commandExecutor.writeAllAsync(RedisCommands.FLUSHALL)); } - } diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index fef72feb4..76a6e8d92 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -19,6 +19,7 @@ import java.util.List; import org.redisson.client.codec.Codec; import org.redisson.core.RAtomicLongReactive; +import org.redisson.core.RBitSetReactive; import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; import org.redisson.core.RDequeReactive; @@ -174,16 +175,8 @@ public interface RedissonReactiveClient { */ RAtomicLongReactive getAtomicLong(String name); -// /** -// * Returns "count down latch" instance by name. -// * -// * @param name of the "count down latch" -// * @return -// */ -// RCountDownLatch getCountDownLatch(String name); -// -// RBitSet getBitSet(String name); -// + RBitSetReactive getBitSet(String name); + // /** // * Returns script operations object // * diff --git a/src/main/java/org/redisson/client/codec/BitSetCodec.java b/src/main/java/org/redisson/client/codec/BitSetCodec.java new file mode 100644 index 000000000..3423530fe --- /dev/null +++ b/src/main/java/org/redisson/client/codec/BitSetCodec.java @@ -0,0 +1,79 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.codec; + +import java.util.BitSet; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; + +import io.netty.buffer.ByteBuf; + +public class BitSetCodec implements Codec { + + public static final BitSetCodec INSTANCE = new BitSetCodec(); + + private final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) { + byte[] result = new byte[buf.readableBytes()]; + buf.readBytes(result); + return fromByteArrayReverse(result); + } + }; + + private static BitSet fromByteArrayReverse(byte[] bytes) { + BitSet bits = new BitSet(); + for (int i = 0; i < bytes.length * 8; i++) { + if ((bytes[i / 8] & (1 << (7 - (i % 8)))) != 0) { + bits.set(i); + } + } + return bits; + } + + @Override + public Decoder getValueDecoder() { + return decoder; + } + + @Override + public Encoder getValueEncoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Decoder getMapValueDecoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Encoder getMapValueEncoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Decoder getMapKeyDecoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Encoder getMapKeyEncoder() { + throw new UnsupportedOperationException(); + } + +} diff --git a/src/main/java/org/redisson/core/RBitSetReactive.java b/src/main/java/org/redisson/core/RBitSetReactive.java new file mode 100644 index 000000000..bae5cc7ac --- /dev/null +++ b/src/main/java/org/redisson/core/RBitSetReactive.java @@ -0,0 +1,65 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.BitSet; + +import org.reactivestreams.Publisher; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RBitSetReactive extends RExpirableReactive { + + Publisher asBitSet(); + + Publisher toByteArray(); + + Publisher length(); + + Publisher set(int fromIndex, int toIndex, boolean value); + + Publisher clear(int fromIndex, int toIndex); + + Publisher set(BitSet bs); + + Publisher not(); + + Publisher set(int fromIndex, int toIndex); + + Publisher size(); + + Publisher get(int bitIndex); + + Publisher set(int bitIndex); + + Publisher set(int bitIndex, boolean value); + + Publisher cardinality(); + + Publisher clear(int bitIndex); + + Publisher clear(); + + Publisher or(String... bitSetNames); + + Publisher and(String... bitSetNames); + + Publisher xor(String... bitSetNames); + +} diff --git a/src/test/java/org/redisson/RedissonBitSetReactiveTest.java b/src/test/java/org/redisson/RedissonBitSetReactiveTest.java new file mode 100644 index 000000000..eb22fab29 --- /dev/null +++ b/src/test/java/org/redisson/RedissonBitSetReactiveTest.java @@ -0,0 +1,130 @@ +package org.redisson; + +import java.util.BitSet; + +import org.junit.Assert; +import org.junit.Test; +import org.redisson.core.RBitSetReactive; + +public class RedissonBitSetReactiveTest extends BaseReactiveTest { + + @Test + public void testLength() { + RBitSetReactive bs = redisson.getBitSet("testbitset"); + sync(bs.set(0, 5)); + sync(bs.clear(0, 1)); + Assert.assertEquals(5, sync(bs.length()).intValue()); + + sync(bs.clear()); + sync(bs.set(28)); + sync(bs.set(31)); + Assert.assertEquals(32, sync(bs.length()).intValue()); + + sync(bs.clear()); + sync(bs.set(3)); + sync(bs.set(7)); + Assert.assertEquals(8, sync(bs.length()).intValue()); + + sync(bs.clear()); + sync(bs.set(3)); + sync(bs.set(120)); + sync(bs.set(121)); + Assert.assertEquals(122, sync(bs.length()).intValue()); + + sync(bs.clear()); + sync(bs.set(0)); + Assert.assertEquals(1, sync(bs.length()).intValue()); + } + + @Test + public void testClear() { + RBitSetReactive bs = redisson.getBitSet("testbitset"); + sync(bs.set(0, 8)); + sync(bs.clear(0, 3)); + Assert.assertEquals("{3, 4, 5, 6, 7}", bs.toString()); + } + + @Test + public void testNot() { + RBitSetReactive bs = redisson.getBitSet("testbitset"); + sync(bs.set(3)); + sync(bs.set(5)); + sync(bs.not()); + Assert.assertEquals("{0, 1, 2, 4, 6, 7}", bs.toString()); + } + + @Test + public void testSet() { + RBitSetReactive bs = redisson.getBitSet("testbitset"); + sync(bs.set(3)); + sync(bs.set(5)); + Assert.assertEquals("{3, 5}", bs.toString()); + + BitSet bs1 = new BitSet(); + bs1.set(1); + bs1.set(10); + sync(bs.set(bs1)); + + bs = redisson.getBitSet("testbitset"); + + Assert.assertEquals("{1, 10}", bs.toString()); + } + + @Test + public void testSetGet() { + RBitSetReactive bitset = redisson.getBitSet("testbitset"); + Assert.assertEquals(0, sync(bitset.cardinality()).intValue()); + Assert.assertEquals(0, sync(bitset.size()).intValue()); + + sync(bitset.set(10, true)); + sync(bitset.set(31, true)); + Assert.assertFalse(sync(bitset.get(0))); + Assert.assertTrue(sync(bitset.get(31))); + Assert.assertTrue(sync(bitset.get(10))); + Assert.assertEquals(2, sync(bitset.cardinality()).intValue()); + Assert.assertEquals(32, sync(bitset.size()).intValue()); + } + + @Test + public void testSetRange() { + RBitSetReactive bs = redisson.getBitSet("testbitset"); + sync(bs.set(3, 10)); + Assert.assertEquals(7, sync(bs.cardinality()).intValue()); + Assert.assertEquals(16, sync(bs.size()).intValue()); + } + + @Test + public void testAsBitSet() { + RBitSetReactive bs = redisson.getBitSet("testbitset"); + sync(bs.set(3, true)); + sync(bs.set(41, true)); + Assert.assertEquals(48, sync(bs.size()).intValue()); + + BitSet bitset = sync(bs.asBitSet()); + Assert.assertTrue(bitset.get(3)); + Assert.assertTrue(bitset.get(41)); + Assert.assertEquals(2, bitset.cardinality()); + } + + @Test + public void testAnd() { + RBitSetReactive bs1 = redisson.getBitSet("testbitset1"); + sync(bs1.set(3, 5)); + Assert.assertEquals(2, sync(bs1.cardinality()).intValue()); + Assert.assertEquals(8, sync(bs1.size()).intValue()); + + RBitSetReactive bs2 = redisson.getBitSet("testbitset2"); + sync(bs2.set(4)); + sync(bs2.set(10)); + sync(bs1.and(bs2.getName())); + Assert.assertFalse(sync(bs1.get(3))); + Assert.assertTrue(sync(bs1.get(4))); + Assert.assertFalse(sync(bs1.get(5))); + Assert.assertTrue(sync(bs2.get(10))); + + Assert.assertEquals(1, sync(bs1.cardinality()).intValue()); + Assert.assertEquals(16, sync(bs1.size()).intValue()); + } + + +}