From 838febf3c56041620c26a0a99e66a7bcbc409651 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 9 Feb 2016 12:40:45 +0300 Subject: [PATCH] RedissonMapReactive refactored --- .../reactive/RedissonMapReactive.java | 127 +++--------------- 1 file changed, 19 insertions(+), 108 deletions(-) diff --git a/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/src/main/java/org/redisson/reactive/RedissonMapReactive.java index e4c6c2e74..06625e71f 100644 --- a/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -15,32 +15,20 @@ */ package org.redisson.reactive; -import java.io.IOException; -import java.math.BigDecimal; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.reactivestreams.Publisher; +import org.redisson.RedissonMap; import org.redisson.api.RMapReactive; import org.redisson.client.codec.Codec; import org.redisson.client.codec.ScanCodec; -import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.BooleanReplayConvertor; -import org.redisson.client.protocol.convertor.LongReplayConvertor; -import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; -import org.redisson.connection.decoder.MapGetAllDecoder; import reactor.fn.BiFunction; import reactor.fn.Function; @@ -58,156 +46,86 @@ import reactor.rx.Streams; */ public class RedissonMapReactive extends RedissonExpirableReactive implements RMapReactive { - private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_REPLACE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)); - private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new LongReplayConvertor(), 4, ValueType.MAP); - private static final RedisCommand EVAL_PUT = EVAL_REPLACE; + private final RedissonMap instance; public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); + instance = new RedissonMap<>(codec, commandExecutor, name); } public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { super(codec, commandExecutor, name); + instance = new RedissonMap<>(codec, commandExecutor, name); } @Override public Publisher size() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.HLEN, getName()); + return reactive(instance.sizeAsync()); } @Override public Publisher containsKey(Object key) { - return commandExecutor.readReactive(getName(), codec, RedisCommands.HEXISTS, getName(), key); + return reactive(instance.containsKeyAsync(key)); } @Override public Publisher containsValue(Object value) { - return commandExecutor.evalReadReactive(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), - "local s = redis.call('hvals', KEYS[1]);" + - "for i = 0, table.getn(s), 1 do " - + "if ARGV[1] == s[i] then " - + "return 1 " - + "end " - + "end;" + - "return 0", - Collections.singletonList(getName()), value); + return reactive(instance.containsValueAsync(value)); } @Override public Publisher> getAll(Set keys) { - if (keys.size() == 0) { - return newSucceeded(Collections.emptyMap()); - } - List args = new ArrayList(keys.size() + 1); - args.add(getName()); - args.addAll(keys); - return commandExecutor.readReactive(getName(), codec, new RedisCommand>("HMGET", new MapGetAllDecoder(args), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE), args.toArray()); + return reactive(instance.getAllAsync(keys)); } public Publisher putAll(Map map) { - if (map.isEmpty()) { - return newSucceeded(null); - } - - List params = new ArrayList(map.size()*2 + 1); - params.add(getName()); - for (java.util.Map.Entry t : map.entrySet()) { - params.add(t.getKey()); - params.add(t.getValue()); - } - - return commandExecutor.writeReactive(getName(), codec, RedisCommands.HMSET, params.toArray()); + return reactive(instance.putAllAsync(map)); } @Override public Publisher putIfAbsent(K key, V value) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_PUT, - "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " - + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " - + "return nil " - + "else " - + "return redis.call('hget', KEYS[1], ARGV[1]) " - + "end", - Collections.singletonList(getName()), key, value); + return reactive(instance.putIfAbsentAsync(key, value)); } @Override public Publisher remove(Object key, Object value) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE_VALUE, - "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " - + "return redis.call('hdel', KEYS[1], ARGV[1]) " - + "else " - + "return 0 " - + "end", - Collections.singletonList(getName()), key, value); + return reactive(instance.removeAsync(key, value)); } @Override public Publisher replace(K key, V oldValue, V newValue) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REPLACE_VALUE, - "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " - + "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); " - + "return 1; " - + "else " - + "return 0; " - + "end", - Collections.singletonList(getName()), key, oldValue, newValue); + return reactive(instance.replaceAsync(key, oldValue, newValue)); } @Override public Publisher replace(K key, V value) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REPLACE, - "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " - + "local v = redis.call('hget', KEYS[1], ARGV[1]); " - + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " - + "return v; " - + "else " - + "return nil; " - + "end", - Collections.singletonList(getName()), key, value); + return reactive(instance.replaceAsync(key, value)); } @Override public Publisher get(K key) { - return commandExecutor.readReactive(getName(), codec, RedisCommands.HGET, getName(), key); + return reactive(instance.getAsync(key)); } @Override public Publisher put(K key, V value) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_PUT, - "local v = redis.call('hget', KEYS[1], ARGV[1]); " - + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " - + "return v", - Collections.singletonList(getName()), key, value); + return reactive(instance.putAsync(key, value)); } @Override public Publisher remove(K key) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE, - "local v = redis.call('hget', KEYS[1], ARGV[1]); " - + "redis.call('hdel', KEYS[1], ARGV[1]); " - + "return v", - Collections.singletonList(getName()), key); + return reactive(instance.removeAsync(key)); } @Override public Publisher fastPut(K key, V value) { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.HSET, getName(), key, value); + return reactive(instance.fastPutAsync(key, value)); } @Override public Publisher fastRemove(K ... keys) { - if (keys == null || keys.length == 0) { - return newSucceeded(0L); - } - - List args = new ArrayList(keys.length + 1); - args.add(getName()); - args.addAll(Arrays.asList(keys)); - return commandExecutor.writeReactive(getName(), codec, RedisCommands.HDEL, args.toArray()); + return reactive(instance.fastRemoveAsync(keys)); } Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { @@ -241,14 +159,7 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme @Override public Publisher addAndGet(K key, Number value) { - try { - byte[] keyState = codec.getMapKeyEncoder().encode(key); - return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, - new RedisCommand("HINCRBYFLOAT", new NumberConvertor(value.getClass())), - getName(), keyState, new BigDecimal(value.toString()).toPlainString()); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } + return reactive(instance.addAndGetAsync(key, value)); } @Override