RedissonMapReactive refactored

pull/400/head
Nikita 9 years ago
parent 4ed56f86c3
commit 838febf3c5

@ -15,32 +15,20 @@
*/
package org.redisson.reactive;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
import org.redisson.api.RMapReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import reactor.fn.BiFunction;
import reactor.fn.Function;
@ -58,156 +46,86 @@ import reactor.rx.Streams;
*/
public class RedissonMapReactive<K, V> extends RedissonExpirableReactive implements RMapReactive<K, V> {
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
private final RedissonMap<K, V> instance;
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonMap<>(codec, commandExecutor, name);
}
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonMap<>(codec, commandExecutor, name);
}
@Override
public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.HLEN, getName());
return reactive(instance.sizeAsync());
}
@Override
public Publisher<Boolean> containsKey(Object key) {
return commandExecutor.readReactive(getName(), codec, RedisCommands.HEXISTS, getName(), key);
return reactive(instance.containsKeyAsync(key));
}
@Override
public Publisher<Boolean> containsValue(Object value) {
return commandExecutor.evalReadReactive(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do "
+ "if ARGV[1] == s[i] then "
+ "return 1 "
+ "end "
+ "end;" +
"return 0",
Collections.<Object>singletonList(getName()), value);
return reactive(instance.containsValueAsync(value));
}
@Override
public Publisher<Map<K, V>> getAll(Set<K> keys) {
if (keys.size() == 0) {
return newSucceeded(Collections.<K, V>emptyMap());
}
List<Object> args = new ArrayList<Object>(keys.size() + 1);
args.add(getName());
args.addAll(keys);
return commandExecutor.readReactive(getName(), codec, new RedisCommand<Map<Object, Object>>("HMGET", new MapGetAllDecoder(args), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE), args.toArray());
return reactive(instance.getAllAsync(keys));
}
public Publisher<Void> putAll(Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return newSucceeded(null);
}
List<Object> params = new ArrayList<Object>(map.size()*2 + 1);
params.add(getName());
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
params.add(t.getKey());
params.add(t.getValue());
}
return commandExecutor.writeReactive(getName(), codec, RedisCommands.HMSET, params.toArray());
return reactive(instance.putAllAsync(map));
}
@Override
public Publisher<V> putIfAbsent(K key, V value) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_PUT,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return nil "
+ "else "
+ "return redis.call('hget', KEYS[1], ARGV[1]) "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
return reactive(instance.putIfAbsentAsync(key, value));
}
@Override
public Publisher<Long> remove(Object key, Object value) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "
+ "else "
+ "return 0 "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
return reactive(instance.removeAsync(key, value));
}
@Override
public Publisher<Boolean> replace(K key, V oldValue, V newValue) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REPLACE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Collections.<Object>singletonList(getName()), key, oldValue, newValue);
return reactive(instance.replaceAsync(key, oldValue, newValue));
}
@Override
public Publisher<V> replace(K key, V value) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REPLACE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return v; "
+ "else "
+ "return nil; "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
return reactive(instance.replaceAsync(key, value));
}
@Override
public Publisher<V> get(K key) {
return commandExecutor.readReactive(getName(), codec, RedisCommands.HGET, getName(), key);
return reactive(instance.getAsync(key));
}
@Override
public Publisher<V> put(K key, V value) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_PUT,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return v",
Collections.<Object>singletonList(getName()), key, value);
return reactive(instance.putAsync(key, value));
}
@Override
public Publisher<V> remove(K key) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "return v",
Collections.<Object>singletonList(getName()), key);
return reactive(instance.removeAsync(key));
}
@Override
public Publisher<Boolean> fastPut(K key, V value) {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.HSET, getName(), key, value);
return reactive(instance.fastPutAsync(key, value));
}
@Override
public Publisher<Long> fastRemove(K ... keys) {
if (keys == null || keys.length == 0) {
return newSucceeded(0L);
}
List<Object> args = new ArrayList<Object>(keys.length + 1);
args.add(getName());
args.addAll(Arrays.asList(keys));
return commandExecutor.writeReactive(getName(), codec, RedisCommands.HDEL, args.toArray());
return reactive(instance.fastRemoveAsync(keys));
}
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
@ -241,14 +159,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
@Override
public Publisher<V> addAndGet(K key, Number value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE,
new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
getName(), keyState, new BigDecimal(value.toString()).toPlainString());
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
return reactive(instance.addAndGetAsync(key, value));
}
@Override

Loading…
Cancel
Save