RBitSetReactive added. #210

pull/337/head
Nikita 9 years ago
parent fcbdfd0dea
commit 9a4311e844

@ -0,0 +1,172 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.BitSetCodec;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandBatchAsyncService;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.core.RBitSetReactive;
import reactor.rx.Streams;
public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive {
protected RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
}
public Publisher<Boolean> get(int bitIndex) {
return commandExecutor.readObservable(getName(), codec, RedisCommands.GETBIT, getName(), bitIndex);
}
public Publisher<Void> set(int bitIndex, boolean value) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.SETBIT, getName(), bitIndex, value ? 1 : 0);
}
public Publisher<byte[]> toByteArray() {
return commandExecutor.readObservable(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName());
}
private Publisher<Void> op(String op, String... bitSetNames) {
List<Object> params = new ArrayList<Object>(bitSetNames.length + 3);
params.add(op);
params.add(getName());
params.add(getName());
params.addAll(Arrays.asList(bitSetNames));
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BITOP, params.toArray());
}
public Publisher<BitSet> asBitSet() {
return commandExecutor.readObservable(getName(), BitSetCodec.INSTANCE, RedisCommands.GET, getName());
}
//Copied from: https://github.com/xetorthio/jedis/issues/301
private static byte[] toByteArrayReverse(BitSet bits) {
byte[] bytes = new byte[bits.length() / 8 + 1];
for (int i = 0; i < bits.length(); i++) {
if (bits.get(i)) {
final int value = bytes[i / 8] | (1 << (7 - (i % 8)));
bytes[i / 8] = (byte) value;
}
}
return bytes;
}
@Override
public Publisher<Integer> length() {
return commandExecutor.evalReadObservable(getName(), codec, RedisCommands.EVAL_INTEGER,
"local fromBit = redis.call('bitpos', KEYS[1], 1, -1);"
+ "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;"
+ "for i = toBit, fromBit, -1 do "
+ "if redis.call('getbit', KEYS[1], i) == 1 then "
+ "return i+1;"
+ "end;"
+ "end;" +
"return fromBit+1",
Collections.<Object>singletonList(getName()));
}
@Override
public Publisher<Void> set(int fromIndex, int toIndex, boolean value) {
if (value) {
return set(fromIndex, toIndex);
}
return clear(fromIndex, toIndex);
}
@Override
public Publisher<Void> clear(int fromIndex, int toIndex) {
CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0);
}
return new NettyFuturePublisher<Void>(executorService.executeAsyncVoid());
}
@Override
public Publisher<Void> set(BitSet bs) {
return commandExecutor.writeObservable(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs));
}
@Override
public Publisher<Void> not() {
return op("NOT");
}
@Override
public Publisher<Void> set(int fromIndex, int toIndex) {
CommandBatchAsyncService executorService = new CommandBatchAsyncService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1);
}
return new NettyFuturePublisher<Void>(executorService.executeAsyncVoid());
}
@Override
public Publisher<Integer> size() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.BITS_SIZE, getName());
}
@Override
public Publisher<Void> set(int bitIndex) {
return set(bitIndex, true);
}
@Override
public Publisher<Integer> cardinality() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.BITCOUNT, getName());
}
@Override
public Publisher<Void> clear(int bitIndex) {
return set(bitIndex, false);
}
@Override
public Publisher<Void> clear() {
return commandExecutor.writeObservable(getName(), RedisCommands.DEL_VOID, getName());
}
@Override
public Publisher<Void> or(String... bitSetNames) {
return op("OR", bitSetNames);
}
@Override
public Publisher<Void> and(String... bitSetNames) {
return op("AND", bitSetNames);
}
@Override
public Publisher<Void> xor(String... bitSetNames) {
return op("XOR", bitSetNames);
}
@Override
public String toString() {
return Streams.create(asBitSet()).next().poll().toString();
}
}

@ -30,6 +30,7 @@ import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.RAtomicLongReactive;
import org.redisson.core.RBitSetReactive;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RDequeReactive;
@ -220,6 +221,11 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonAtomicLongReactive(commandExecutor, name);
}
@Override
public RBitSetReactive getBitSet(String name) {
return new RedissonBitSetReactive(commandExecutor, name);
}
public Config getConfig() {
return config;
}
@ -238,6 +244,5 @@ public class RedissonReactive implements RedissonReactiveClient {
public void flushall() {
commandExecutor.get(commandExecutor.writeAllAsync(RedisCommands.FLUSHALL));
}
}

@ -19,6 +19,7 @@ import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.core.RAtomicLongReactive;
import org.redisson.core.RBitSetReactive;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RDequeReactive;
@ -174,16 +175,8 @@ public interface RedissonReactiveClient {
*/
RAtomicLongReactive getAtomicLong(String name);
// /**
// * Returns "count down latch" instance by name.
// *
// * @param name of the "count down latch"
// * @return
// */
// RCountDownLatch getCountDownLatch(String name);
//
// RBitSet getBitSet(String name);
//
RBitSetReactive getBitSet(String name);
// /**
// * Returns script operations object
// *

@ -0,0 +1,79 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import java.util.BitSet;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
public class BitSetCodec implements Codec {
public static final BitSetCodec INSTANCE = new BitSetCodec();
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) {
byte[] result = new byte[buf.readableBytes()];
buf.readBytes(result);
return fromByteArrayReverse(result);
}
};
private static BitSet fromByteArrayReverse(byte[] bytes) {
BitSet bits = new BitSet();
for (int i = 0; i < bytes.length * 8; i++) {
if ((bytes[i / 8] & (1 << (7 - (i % 8)))) != 0) {
bits.set(i);
}
}
return bits;
}
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getMapValueDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapKeyEncoder() {
throw new UnsupportedOperationException();
}
}

@ -0,0 +1,65 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.BitSet;
import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
*/
public interface RBitSetReactive extends RExpirableReactive {
Publisher<BitSet> asBitSet();
Publisher<byte[]> toByteArray();
Publisher<Integer> length();
Publisher<Void> set(int fromIndex, int toIndex, boolean value);
Publisher<Void> clear(int fromIndex, int toIndex);
Publisher<Void> set(BitSet bs);
Publisher<Void> not();
Publisher<Void> set(int fromIndex, int toIndex);
Publisher<Integer> size();
Publisher<Boolean> get(int bitIndex);
Publisher<Void> set(int bitIndex);
Publisher<Void> set(int bitIndex, boolean value);
Publisher<Integer> cardinality();
Publisher<Void> clear(int bitIndex);
Publisher<Void> clear();
Publisher<Void> or(String... bitSetNames);
Publisher<Void> and(String... bitSetNames);
Publisher<Void> xor(String... bitSetNames);
}

@ -0,0 +1,130 @@
package org.redisson;
import java.util.BitSet;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RBitSetReactive;
public class RedissonBitSetReactiveTest extends BaseReactiveTest {
@Test
public void testLength() {
RBitSetReactive bs = redisson.getBitSet("testbitset");
sync(bs.set(0, 5));
sync(bs.clear(0, 1));
Assert.assertEquals(5, sync(bs.length()).intValue());
sync(bs.clear());
sync(bs.set(28));
sync(bs.set(31));
Assert.assertEquals(32, sync(bs.length()).intValue());
sync(bs.clear());
sync(bs.set(3));
sync(bs.set(7));
Assert.assertEquals(8, sync(bs.length()).intValue());
sync(bs.clear());
sync(bs.set(3));
sync(bs.set(120));
sync(bs.set(121));
Assert.assertEquals(122, sync(bs.length()).intValue());
sync(bs.clear());
sync(bs.set(0));
Assert.assertEquals(1, sync(bs.length()).intValue());
}
@Test
public void testClear() {
RBitSetReactive bs = redisson.getBitSet("testbitset");
sync(bs.set(0, 8));
sync(bs.clear(0, 3));
Assert.assertEquals("{3, 4, 5, 6, 7}", bs.toString());
}
@Test
public void testNot() {
RBitSetReactive bs = redisson.getBitSet("testbitset");
sync(bs.set(3));
sync(bs.set(5));
sync(bs.not());
Assert.assertEquals("{0, 1, 2, 4, 6, 7}", bs.toString());
}
@Test
public void testSet() {
RBitSetReactive bs = redisson.getBitSet("testbitset");
sync(bs.set(3));
sync(bs.set(5));
Assert.assertEquals("{3, 5}", bs.toString());
BitSet bs1 = new BitSet();
bs1.set(1);
bs1.set(10);
sync(bs.set(bs1));
bs = redisson.getBitSet("testbitset");
Assert.assertEquals("{1, 10}", bs.toString());
}
@Test
public void testSetGet() {
RBitSetReactive bitset = redisson.getBitSet("testbitset");
Assert.assertEquals(0, sync(bitset.cardinality()).intValue());
Assert.assertEquals(0, sync(bitset.size()).intValue());
sync(bitset.set(10, true));
sync(bitset.set(31, true));
Assert.assertFalse(sync(bitset.get(0)));
Assert.assertTrue(sync(bitset.get(31)));
Assert.assertTrue(sync(bitset.get(10)));
Assert.assertEquals(2, sync(bitset.cardinality()).intValue());
Assert.assertEquals(32, sync(bitset.size()).intValue());
}
@Test
public void testSetRange() {
RBitSetReactive bs = redisson.getBitSet("testbitset");
sync(bs.set(3, 10));
Assert.assertEquals(7, sync(bs.cardinality()).intValue());
Assert.assertEquals(16, sync(bs.size()).intValue());
}
@Test
public void testAsBitSet() {
RBitSetReactive bs = redisson.getBitSet("testbitset");
sync(bs.set(3, true));
sync(bs.set(41, true));
Assert.assertEquals(48, sync(bs.size()).intValue());
BitSet bitset = sync(bs.asBitSet());
Assert.assertTrue(bitset.get(3));
Assert.assertTrue(bitset.get(41));
Assert.assertEquals(2, bitset.cardinality());
}
@Test
public void testAnd() {
RBitSetReactive bs1 = redisson.getBitSet("testbitset1");
sync(bs1.set(3, 5));
Assert.assertEquals(2, sync(bs1.cardinality()).intValue());
Assert.assertEquals(8, sync(bs1.size()).intValue());
RBitSetReactive bs2 = redisson.getBitSet("testbitset2");
sync(bs2.set(4));
sync(bs2.set(10));
sync(bs1.and(bs2.getName()));
Assert.assertFalse(sync(bs1.get(3)));
Assert.assertTrue(sync(bs1.get(4)));
Assert.assertFalse(sync(bs1.get(5)));
Assert.assertTrue(sync(bs2.get(10)));
Assert.assertEquals(1, sync(bs1.cardinality()).intValue());
Assert.assertEquals(16, sync(bs1.size()).intValue());
}
}
Loading…
Cancel
Save