RBitSetAsync interface implemented. #270

pull/297/head
Nikita 9 years ago
parent f9498c0fd9
commit 9792e2ff09

@ -129,6 +129,31 @@ public class CommandBatchExecutorService extends CommandExecutorService {
return get(executeAsync());
}
public Future<Void> executeAsyncVoid() {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
if (commands.isEmpty()) {
return connectionManager.getGroup().next().newSucceededFuture(null);
}
executed = true;
Promise<Void> voidPromise = connectionManager.newPromise();
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
commands = null;
}
});
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0);
}
return voidPromise;
}
public Future<List<?>> executeAsync() {
if (executed) {
throw new IllegalStateException("Batch already executed!");

@ -34,21 +34,11 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
}
public int length() {
return commandExecutor.evalRead(getName(), codec, RedisCommands.EVAL_INTEGER,
"local fromBit = redis.call('bitpos', KEYS[1], 1, -1);"
+ "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;"
+ "for i = toBit, fromBit, -1 do "
+ "if redis.call('getbit', KEYS[1], i) == 1 then "
+ "return i+1;"
+ "end;"
+ "end;" +
"return fromBit+1",
Collections.<Object>singletonList(getName()));
return get(lengthAsync());
}
public void set(BitSet bs) {
commandExecutor.write(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs));
get(setAsync(bs));
}
public boolean get(int bitIndex) {
@ -60,23 +50,15 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
}
public void set(int bitIndex) {
set(bitIndex, true);
get(setAsync(bitIndex, true));
}
public void set(int fromIndex, int toIndex, boolean value) {
if (value) {
set(fromIndex, toIndex);
return;
}
clear(fromIndex, toIndex);
get(setAsync(fromIndex, toIndex, value));
}
public void set(int fromIndex, int toIndex) {
CommandBatchExecutorService executorService = new CommandBatchExecutorService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1);
}
executorService.execute();
get(setAsync(fromIndex, toIndex));
}
public void set(int bitIndex, boolean value) {
@ -88,57 +70,56 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
}
public byte[] toByteArray() {
return commandExecutor.read(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName());
return get(toByteArrayAsync());
}
public Future<byte[]> toByteArrayAsync() {
return commandExecutor.readAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName());
}
public int cardinality() {
return commandExecutor.read(getName(), codec, RedisCommands.BITCOUNT, getName());
return get(cardinalityAsync());
}
public int size() {
int r = commandExecutor.read(getName(), codec, RedisCommands.STRLEN, getName());
return r * 8;
return get(sizeAsync());
}
public void clear(int fromIndex, int toIndex) {
CommandBatchExecutorService executorService = new CommandBatchExecutorService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0);
}
executorService.execute();
get(clearAsync(fromIndex, toIndex));
}
public void clear(int bitIndex) {
set(bitIndex, false);
get(clearAsync(bitIndex));
}
public void clear() {
delete();
get(clearAsync());
}
public void or(String... bitSetNames) {
op("OR", bitSetNames);
get(orAsync(bitSetNames));
}
public void and(String... bitSetNames) {
op("AND", bitSetNames);
get(andAsync(bitSetNames));
}
public void xor(String... bitSetNames) {
op("XOR", bitSetNames);
get(xorAsync(bitSetNames));
}
public void not() {
op("NOT");
get(notAsync());
}
private void op(String op, String... bitSetNames) {
private Future<Void> opAsync(String op, String... bitSetNames) {
List<Object> params = new ArrayList<Object>(bitSetNames.length + 3);
params.add(op);
params.add(getName());
params.add(getName());
params.addAll(Arrays.asList(bitSetNames));
commandExecutor.write(getName(), codec, RedisCommands.BITOP, params.toArray());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BITOP, params.toArray());
}
public BitSet asBitSet() {
@ -173,4 +154,94 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
return asBitSet().toString();
}
@Override
public Future<Integer> lengthAsync() {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_INTEGER,
"local fromBit = redis.call('bitpos', KEYS[1], 1, -1);"
+ "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;"
+ "for i = toBit, fromBit, -1 do "
+ "if redis.call('getbit', KEYS[1], i) == 1 then "
+ "return i+1;"
+ "end;"
+ "end;" +
"return fromBit+1",
Collections.<Object>singletonList(getName()));
}
@Override
public Future<Void> setAsync(int fromIndex, int toIndex, boolean value) {
if (value) {
return setAsync(fromIndex, toIndex);
}
return clearAsync(fromIndex, toIndex);
}
@Override
public Future<Void> clearAsync(int fromIndex, int toIndex) {
CommandBatchExecutorService executorService = new CommandBatchExecutorService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 0);
}
return executorService.executeAsyncVoid();
}
@Override
public Future<Void> setAsync(BitSet bs) {
return commandExecutor.writeAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs));
}
@Override
public Future<Void> notAsync() {
return opAsync("NOT");
}
@Override
public Future<Void> setAsync(int fromIndex, int toIndex) {
CommandBatchExecutorService executorService = new CommandBatchExecutorService(commandExecutor.getConnectionManager());
for (int i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT, getName(), i, 1);
}
return executorService.executeAsyncVoid();
}
@Override
public Future<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.BITS_SIZE, getName());
}
@Override
public Future<Void> setAsync(int bitIndex) {
return setAsync(bitIndex, true);
}
@Override
public Future<Integer> cardinalityAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.BITCOUNT, getName());
}
@Override
public Future<Void> clearAsync(int bitIndex) {
return setAsync(bitIndex, false);
}
@Override
public Future<Void> clearAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_VOID, getName());
}
@Override
public Future<Void> orAsync(String... bitSetNames) {
return opAsync("OR", bitSetNames);
}
@Override
public Future<Void> andAsync(String... bitSetNames) {
return opAsync("AND", bitSetNames);
}
@Override
public Future<Void> xorAsync(String... bitSetNames) {
return opAsync("XOR", bitSetNames);
}
}

@ -20,6 +20,7 @@ import java.util.Map;
import java.util.Set;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
@ -51,6 +52,7 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
public interface RedisCommands {
RedisStrictCommand<Boolean> GETBIT = new RedisStrictCommand<Boolean>("GETBIT", new BooleanReplayConvertor());
RedisStrictCommand<Integer> BITS_SIZE = new RedisStrictCommand<Integer>("STRLEN", new BitsSizeReplayConvertor());
RedisStrictCommand<Integer> STRLEN = new RedisStrictCommand<Integer>("STRLEN", new IntegerReplayConvertor());
RedisStrictCommand<Integer> BITCOUNT = new RedisStrictCommand<Integer>("BITCOUNT", new IntegerReplayConvertor());
RedisStrictCommand<Integer> BITPOS = new RedisStrictCommand<Integer>("BITPOS", new IntegerReplayConvertor());
@ -171,6 +173,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> DEL = new RedisStrictCommand<Long>("DEL");
RedisStrictCommand<Boolean> DEL_SINGLE = new RedisStrictCommand<Boolean>("DEL", new BooleanReplayConvertor());
RedisStrictCommand<Void> DEL_VOID = new RedisStrictCommand<Void>("DEL", new VoidReplayConvertor());
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisCommand<Void> SET = new RedisCommand<Void>("SET", new VoidReplayConvertor(), 2);

@ -0,0 +1,29 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
public class BitsSizeReplayConvertor extends SingleConvertor<Integer> {
@Override
public Integer convert(Object obj) {
if (obj == null) {
return null;
}
int val = ((Long) obj).intValue();
return val * 8;
}
}

@ -20,12 +20,11 @@ import java.util.BitSet;
import io.netty.util.concurrent.Future;
/**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
*
* @author Nikita Koksharov
*
*/
public interface RBitSet extends RExpirable {
public interface RBitSet extends RExpirable, RBitSetAsync {
int length();

@ -0,0 +1,63 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.BitSet;
import io.netty.util.concurrent.Future;
/**
*
* @author Nikita Koksharov
*
*/
public interface RBitSetAsync extends RExpirableAsync {
Future<byte[]> toByteArrayAsync();
Future<Integer> lengthAsync();
Future<Void> setAsync(int fromIndex, int toIndex, boolean value);
Future<Void> clearAsync(int fromIndex, int toIndex);
Future<Void> setAsync(BitSet bs);
Future<Void> notAsync();
Future<Void> setAsync(int fromIndex, int toIndex);
Future<Integer> sizeAsync();
Future<Boolean> getAsync(int bitIndex);
Future<Void> setAsync(int bitIndex);
Future<Void> setAsync(int bitIndex, boolean value);
Future<Integer> cardinalityAsync();
Future<Void> clearAsync(int bitIndex);
Future<Void> clearAsync();
Future<Void> orAsync(String... bitSetNames);
Future<Void> andAsync(String... bitSetNames);
Future<Void> xorAsync(String... bitSetNames);
}
Loading…
Cancel
Save