From 9792e2ff093105fa5d6f06c5e3b4ff6570e151ec Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 17 Nov 2015 12:22:20 +0300 Subject: [PATCH] RBitSetAsync interface implemented. #270 --- .../redisson/CommandBatchExecutorService.java | 25 +++ .../java/org/redisson/RedissonBitSet.java | 151 +++++++++++++----- .../client/protocol/RedisCommands.java | 3 + .../convertor/BitsSizeReplayConvertor.java | 29 ++++ src/main/java/org/redisson/core/RBitSet.java | 3 +- .../java/org/redisson/core/RBitSetAsync.java | 63 ++++++++ 6 files changed, 232 insertions(+), 42 deletions(-) create mode 100644 src/main/java/org/redisson/client/protocol/convertor/BitsSizeReplayConvertor.java create mode 100644 src/main/java/org/redisson/core/RBitSetAsync.java diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 324194d56..7a952b933 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -129,6 +129,31 @@ public class CommandBatchExecutorService extends CommandExecutorService { return get(executeAsync()); } + public Future executeAsyncVoid() { + if (executed) { + throw new IllegalStateException("Batch already executed!"); + } + + if (commands.isEmpty()) { + return connectionManager.getGroup().next().newSucceededFuture(null); + } + executed = true; + + Promise voidPromise = connectionManager.newPromise(); + voidPromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + commands = null; + } + }); + + AtomicInteger slots = new AtomicInteger(commands.size()); + for (java.util.Map.Entry e : commands.entrySet()) { + execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0); + } + return voidPromise; + } + public Future> executeAsync() { if (executed) { throw new IllegalStateException("Batch already executed!"); diff --git a/src/main/java/org/redisson/RedissonBitSet.java b/src/main/java/org/redisson/RedissonBitSet.java index 1a1ae3b95..2fbebe2d3 100644 --- a/src/main/java/org/redisson/RedissonBitSet.java +++ b/src/main/java/org/redisson/RedissonBitSet.java @@ -34,21 +34,11 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet { } public int length() { - return commandExecutor.evalRead(getName(), codec, RedisCommands.EVAL_INTEGER, - "local fromBit = redis.call('bitpos', KEYS[1], 1, -1);" - + "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;" - + "for i = toBit, fromBit, -1 do " - + "if redis.call('getbit', KEYS[1], i) == 1 then " - + "return i+1;" - + "end;" - + "end;" + - "return fromBit+1", - Collections.singletonList(getName())); - + return get(lengthAsync()); } public void set(BitSet bs) { - commandExecutor.write(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs)); + get(setAsync(bs)); } public boolean get(int bitIndex) { @@ -60,23 +50,15 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet { } public void set(int bitIndex) { - set(bitIndex, true); + get(setAsync(bitIndex, true)); } public void set(int fromIndex, int toIndex, boolean value) { - if (value) { - set(fromIndex, toIndex); - return; - } - clear(fromIndex, toIndex); + get(setAsync(fromIndex, toIndex, value)); } public void set(int fromIndex, int toIndex) { - CommandBatchExecutorService executorService = new CommandBatchExecutorService(commandExecutor.getConnectionManager()); - for (int i = fromIndex; i < toIndex; i++) { - executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1); - } - executorService.execute(); + get(setAsync(fromIndex, toIndex)); } public void set(int bitIndex, boolean value) { @@ -88,57 +70,56 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet { } public byte[] toByteArray() { - return commandExecutor.read(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName()); + return get(toByteArrayAsync()); + } + + public Future toByteArrayAsync() { + return commandExecutor.readAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName()); } public int cardinality() { - return commandExecutor.read(getName(), codec, RedisCommands.BITCOUNT, getName()); + return get(cardinalityAsync()); } public int size() { - int r = commandExecutor.read(getName(), codec, RedisCommands.STRLEN, getName()); - return r * 8; + return get(sizeAsync()); } public void clear(int fromIndex, int toIndex) { - CommandBatchExecutorService executorService = new CommandBatchExecutorService(commandExecutor.getConnectionManager()); - for (int i = fromIndex; i < toIndex; i++) { - executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0); - } - executorService.execute(); + get(clearAsync(fromIndex, toIndex)); } public void clear(int bitIndex) { - set(bitIndex, false); + get(clearAsync(bitIndex)); } public void clear() { - delete(); + get(clearAsync()); } public void or(String... bitSetNames) { - op("OR", bitSetNames); + get(orAsync(bitSetNames)); } public void and(String... bitSetNames) { - op("AND", bitSetNames); + get(andAsync(bitSetNames)); } public void xor(String... bitSetNames) { - op("XOR", bitSetNames); + get(xorAsync(bitSetNames)); } public void not() { - op("NOT"); + get(notAsync()); } - private void op(String op, String... bitSetNames) { + private Future opAsync(String op, String... bitSetNames) { List params = new ArrayList(bitSetNames.length + 3); params.add(op); params.add(getName()); params.add(getName()); params.addAll(Arrays.asList(bitSetNames)); - commandExecutor.write(getName(), codec, RedisCommands.BITOP, params.toArray()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BITOP, params.toArray()); } public BitSet asBitSet() { @@ -173,4 +154,94 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet { return asBitSet().toString(); } + @Override + public Future lengthAsync() { + return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_INTEGER, + "local fromBit = redis.call('bitpos', KEYS[1], 1, -1);" + + "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;" + + "for i = toBit, fromBit, -1 do " + + "if redis.call('getbit', KEYS[1], i) == 1 then " + + "return i+1;" + + "end;" + + "end;" + + "return fromBit+1", + Collections.singletonList(getName())); + } + + @Override + public Future setAsync(int fromIndex, int toIndex, boolean value) { + if (value) { + return setAsync(fromIndex, toIndex); + } + return clearAsync(fromIndex, toIndex); + } + + @Override + public Future clearAsync(int fromIndex, int toIndex) { + CommandBatchExecutorService executorService = new CommandBatchExecutorService(commandExecutor.getConnectionManager()); + for (int i = fromIndex; i < toIndex; i++) { + executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0); + } + return executorService.executeAsyncVoid(); + } + + @Override + public Future setAsync(BitSet bs) { + return commandExecutor.writeAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs)); + } + + @Override + public Future notAsync() { + return opAsync("NOT"); + } + + @Override + public Future setAsync(int fromIndex, int toIndex) { + CommandBatchExecutorService executorService = new CommandBatchExecutorService(commandExecutor.getConnectionManager()); + for (int i = fromIndex; i < toIndex; i++) { + executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1); + } + return executorService.executeAsyncVoid(); + } + + @Override + public Future sizeAsync() { + return commandExecutor.readAsync(getName(), codec, RedisCommands.BITS_SIZE, getName()); + } + + @Override + public Future setAsync(int bitIndex) { + return setAsync(bitIndex, true); + } + + @Override + public Future cardinalityAsync() { + return commandExecutor.readAsync(getName(), codec, RedisCommands.BITCOUNT, getName()); + } + + @Override + public Future clearAsync(int bitIndex) { + return setAsync(bitIndex, false); + } + + @Override + public Future clearAsync() { + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_VOID, getName()); + } + + @Override + public Future orAsync(String... bitSetNames) { + return opAsync("OR", bitSetNames); + } + + @Override + public Future andAsync(String... bitSetNames) { + return opAsync("AND", bitSetNames); + } + + @Override + public Future xorAsync(String... bitSetNames) { + return opAsync("XOR", bitSetNames); + } + } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 526915e0c..3bf93bc29 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Set; import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; @@ -51,6 +52,7 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; public interface RedisCommands { RedisStrictCommand GETBIT = new RedisStrictCommand("GETBIT", new BooleanReplayConvertor()); + RedisStrictCommand BITS_SIZE = new RedisStrictCommand("STRLEN", new BitsSizeReplayConvertor()); RedisStrictCommand STRLEN = new RedisStrictCommand("STRLEN", new IntegerReplayConvertor()); RedisStrictCommand BITCOUNT = new RedisStrictCommand("BITCOUNT", new IntegerReplayConvertor()); RedisStrictCommand BITPOS = new RedisStrictCommand("BITPOS", new IntegerReplayConvertor()); @@ -171,6 +173,7 @@ public interface RedisCommands { RedisStrictCommand DEL = new RedisStrictCommand("DEL"); RedisStrictCommand DEL_SINGLE = new RedisStrictCommand("DEL", new BooleanReplayConvertor()); + RedisStrictCommand DEL_VOID = new RedisStrictCommand("DEL", new VoidReplayConvertor()); RedisCommand GET = new RedisCommand("GET"); RedisCommand SET = new RedisCommand("SET", new VoidReplayConvertor(), 2); diff --git a/src/main/java/org/redisson/client/protocol/convertor/BitsSizeReplayConvertor.java b/src/main/java/org/redisson/client/protocol/convertor/BitsSizeReplayConvertor.java new file mode 100644 index 000000000..4304cd100 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/convertor/BitsSizeReplayConvertor.java @@ -0,0 +1,29 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.convertor; + +public class BitsSizeReplayConvertor extends SingleConvertor { + + @Override + public Integer convert(Object obj) { + if (obj == null) { + return null; + } + int val = ((Long) obj).intValue(); + return val * 8; + } + +} diff --git a/src/main/java/org/redisson/core/RBitSet.java b/src/main/java/org/redisson/core/RBitSet.java index bc37606c2..3ddbb3bf4 100644 --- a/src/main/java/org/redisson/core/RBitSet.java +++ b/src/main/java/org/redisson/core/RBitSet.java @@ -20,12 +20,11 @@ import java.util.BitSet; import io.netty.util.concurrent.Future; /** - * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} * * @author Nikita Koksharov * */ -public interface RBitSet extends RExpirable { +public interface RBitSet extends RExpirable, RBitSetAsync { int length(); diff --git a/src/main/java/org/redisson/core/RBitSetAsync.java b/src/main/java/org/redisson/core/RBitSetAsync.java new file mode 100644 index 000000000..f22de3927 --- /dev/null +++ b/src/main/java/org/redisson/core/RBitSetAsync.java @@ -0,0 +1,63 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.BitSet; + +import io.netty.util.concurrent.Future; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RBitSetAsync extends RExpirableAsync { + + Future toByteArrayAsync(); + + Future lengthAsync(); + + Future setAsync(int fromIndex, int toIndex, boolean value); + + Future clearAsync(int fromIndex, int toIndex); + + Future setAsync(BitSet bs); + + Future notAsync(); + + Future setAsync(int fromIndex, int toIndex); + + Future sizeAsync(); + + Future getAsync(int bitIndex); + + Future setAsync(int bitIndex); + + Future setAsync(int bitIndex, boolean value); + + Future cardinalityAsync(); + + Future clearAsync(int bitIndex); + + Future clearAsync(); + + Future orAsync(String... bitSetNames); + + Future andAsync(String... bitSetNames); + + Future xorAsync(String... bitSetNames); + +}