diff --git a/src/main/java/org/redisson/RedissonSet.java b/src/main/java/org/redisson/RedissonSet.java index 0b16b9fbb..9904bcc30 100644 --- a/src/main/java/org/redisson/RedissonSet.java +++ b/src/main/java/org/redisson/RedissonSet.java @@ -17,6 +17,7 @@ package org.redisson; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -289,6 +290,32 @@ public class RedissonSet extends RedissonExpirable implements RSet { return get(removeAllAsync(c)); } + @Override + public int union(String... names) { + return get(unionAsync(names)); + } + + @Override + public Future unionAsync(String... names) { + List args = new ArrayList(names.length + 1); + args.add(getName()); + args.addAll(Arrays.asList(names)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNIONSTORE_INT, args.toArray()); + } + + @Override + public Set readUnion(String... names) { + return get(readUnionAsync(names)); + } + + @Override + public Future> readUnionAsync(String... names) { + List args = new ArrayList(names.length + 1); + args.add(getName()); + args.addAll(Arrays.asList(names)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNION, args.toArray()); + } + @Override public void clear() { delete(); diff --git a/src/main/java/org/redisson/api/RSetReactive.java b/src/main/java/org/redisson/api/RSetReactive.java index 8e6e54148..392c4b33e 100644 --- a/src/main/java/org/redisson/api/RSetReactive.java +++ b/src/main/java/org/redisson/api/RSetReactive.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import java.util.Set; + import org.reactivestreams.Publisher; /** @@ -43,4 +45,22 @@ public interface RSetReactive extends RCollectionReactive { * member of this set or no operation was performed */ Publisher move(String destination, V member); + + /** + * Union sets specified by name and write to current set. + * If current set already exists, it is overwritten. + * + * @param names + * @return + */ + Publisher union(String... names); + + /** + * Union sets specified by name with current set. + * Without current set state change. + * + * @param names + * @return + */ + Publisher> readUnion(String... names); } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index c9bace756..36893e64b 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -118,6 +118,9 @@ public interface RedisCommands { RedisCommand SISMEMBER = new RedisCommand("SISMEMBER", new BooleanReplayConvertor(), 2); RedisStrictCommand SCARD_INT = new RedisStrictCommand("SCARD", new IntegerReplayConvertor()); RedisStrictCommand SCARD = new RedisStrictCommand("SCARD"); + RedisStrictCommand SUNIONSTORE_INT = new RedisStrictCommand("SUNIONSTORE", new IntegerReplayConvertor()); + RedisStrictCommand SUNIONSTORE = new RedisStrictCommand("SUNIONSTORE"); + RedisCommand> SUNION = new RedisCommand>("SUNION", new ObjectSetReplayDecoder()); RedisCommand LSET = new RedisCommand("LSET", new VoidReplayConvertor(), 3); RedisCommand LPOP = new RedisCommand("LPOP"); diff --git a/src/main/java/org/redisson/core/RSet.java b/src/main/java/org/redisson/core/RSet.java index fc7761607..d98db75f6 100644 --- a/src/main/java/org/redisson/core/RSet.java +++ b/src/main/java/org/redisson/core/RSet.java @@ -50,4 +50,22 @@ public interface RSet extends Set, RExpirable, RSetAsync { */ Set readAll(); + /** + * Union sets specified by name and write to current set. + * If current set already exists, it is overwritten. + * + * @param names + * @return + */ + int union(String... names); + + /** + * Union sets specified by name with current set. + * Without current set state change. + * + * @param names + * @return + */ + Set readUnion(String... names); + } diff --git a/src/main/java/org/redisson/core/RSetAsync.java b/src/main/java/org/redisson/core/RSetAsync.java index 9ab86b013..f74512191 100644 --- a/src/main/java/org/redisson/core/RSetAsync.java +++ b/src/main/java/org/redisson/core/RSetAsync.java @@ -53,4 +53,22 @@ public interface RSetAsync extends RCollectionAsync { */ Future> readAllAsync(); + /** + * Union sets specified by name and write to current set. + * If current set already exists, it is overwritten. + * + * @param names + * @return + */ + Future unionAsync(String... keys); + + /** + * Union sets specified by name with current set. + * Without current set state change. + * + * @param names + * @return + */ + Future> readUnionAsync(String... keys); + } diff --git a/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/src/main/java/org/redisson/reactive/RedissonMapReactive.java index e4c6c2e74..06625e71f 100644 --- a/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -15,32 +15,20 @@ */ package org.redisson.reactive; -import java.io.IOException; -import java.math.BigDecimal; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.reactivestreams.Publisher; +import org.redisson.RedissonMap; import org.redisson.api.RMapReactive; import org.redisson.client.codec.Codec; import org.redisson.client.codec.ScanCodec; -import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.BooleanReplayConvertor; -import org.redisson.client.protocol.convertor.LongReplayConvertor; -import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; -import org.redisson.connection.decoder.MapGetAllDecoder; import reactor.fn.BiFunction; import reactor.fn.Function; @@ -58,156 +46,86 @@ import reactor.rx.Streams; */ public class RedissonMapReactive extends RedissonExpirableReactive implements RMapReactive { - private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_REPLACE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)); - private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new LongReplayConvertor(), 4, ValueType.MAP); - private static final RedisCommand EVAL_PUT = EVAL_REPLACE; + private final RedissonMap instance; public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); + instance = new RedissonMap<>(codec, commandExecutor, name); } public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { super(codec, commandExecutor, name); + instance = new RedissonMap<>(codec, commandExecutor, name); } @Override public Publisher size() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.HLEN, getName()); + return reactive(instance.sizeAsync()); } @Override public Publisher containsKey(Object key) { - return commandExecutor.readReactive(getName(), codec, RedisCommands.HEXISTS, getName(), key); + return reactive(instance.containsKeyAsync(key)); } @Override public Publisher containsValue(Object value) { - return commandExecutor.evalReadReactive(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), - "local s = redis.call('hvals', KEYS[1]);" + - "for i = 0, table.getn(s), 1 do " - + "if ARGV[1] == s[i] then " - + "return 1 " - + "end " - + "end;" + - "return 0", - Collections.singletonList(getName()), value); + return reactive(instance.containsValueAsync(value)); } @Override public Publisher> getAll(Set keys) { - if (keys.size() == 0) { - return newSucceeded(Collections.emptyMap()); - } - List args = new ArrayList(keys.size() + 1); - args.add(getName()); - args.addAll(keys); - return commandExecutor.readReactive(getName(), codec, new RedisCommand>("HMGET", new MapGetAllDecoder(args), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE), args.toArray()); + return reactive(instance.getAllAsync(keys)); } public Publisher putAll(Map map) { - if (map.isEmpty()) { - return newSucceeded(null); - } - - List params = new ArrayList(map.size()*2 + 1); - params.add(getName()); - for (java.util.Map.Entry t : map.entrySet()) { - params.add(t.getKey()); - params.add(t.getValue()); - } - - return commandExecutor.writeReactive(getName(), codec, RedisCommands.HMSET, params.toArray()); + return reactive(instance.putAllAsync(map)); } @Override public Publisher putIfAbsent(K key, V value) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_PUT, - "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " - + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " - + "return nil " - + "else " - + "return redis.call('hget', KEYS[1], ARGV[1]) " - + "end", - Collections.singletonList(getName()), key, value); + return reactive(instance.putIfAbsentAsync(key, value)); } @Override public Publisher remove(Object key, Object value) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE_VALUE, - "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " - + "return redis.call('hdel', KEYS[1], ARGV[1]) " - + "else " - + "return 0 " - + "end", - Collections.singletonList(getName()), key, value); + return reactive(instance.removeAsync(key, value)); } @Override public Publisher replace(K key, V oldValue, V newValue) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REPLACE_VALUE, - "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " - + "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); " - + "return 1; " - + "else " - + "return 0; " - + "end", - Collections.singletonList(getName()), key, oldValue, newValue); + return reactive(instance.replaceAsync(key, oldValue, newValue)); } @Override public Publisher replace(K key, V value) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REPLACE, - "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " - + "local v = redis.call('hget', KEYS[1], ARGV[1]); " - + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " - + "return v; " - + "else " - + "return nil; " - + "end", - Collections.singletonList(getName()), key, value); + return reactive(instance.replaceAsync(key, value)); } @Override public Publisher get(K key) { - return commandExecutor.readReactive(getName(), codec, RedisCommands.HGET, getName(), key); + return reactive(instance.getAsync(key)); } @Override public Publisher put(K key, V value) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_PUT, - "local v = redis.call('hget', KEYS[1], ARGV[1]); " - + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " - + "return v", - Collections.singletonList(getName()), key, value); + return reactive(instance.putAsync(key, value)); } @Override public Publisher remove(K key) { - return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE, - "local v = redis.call('hget', KEYS[1], ARGV[1]); " - + "redis.call('hdel', KEYS[1], ARGV[1]); " - + "return v", - Collections.singletonList(getName()), key); + return reactive(instance.removeAsync(key)); } @Override public Publisher fastPut(K key, V value) { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.HSET, getName(), key, value); + return reactive(instance.fastPutAsync(key, value)); } @Override public Publisher fastRemove(K ... keys) { - if (keys == null || keys.length == 0) { - return newSucceeded(0L); - } - - List args = new ArrayList(keys.length + 1); - args.add(getName()); - args.addAll(Arrays.asList(keys)); - return commandExecutor.writeReactive(getName(), codec, RedisCommands.HDEL, args.toArray()); + return reactive(instance.fastRemoveAsync(keys)); } Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { @@ -241,14 +159,7 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme @Override public Publisher addAndGet(K key, Number value) { - try { - byte[] keyState = codec.getMapKeyEncoder().encode(key); - return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, - new RedisCommand("HINCRBYFLOAT", new NumberConvertor(value.getClass())), - getName(), keyState, new BigDecimal(value.toString()).toPlainString()); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } + return reactive(instance.addAndGetAsync(key, value)); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index cbe42e6f9..63aa6bb90 100644 --- a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -21,6 +21,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; +import io.netty.util.concurrent.Future; import reactor.rx.Stream; import reactor.rx.Streams; @@ -42,6 +43,10 @@ abstract class RedissonObjectReactive implements RObjectReactive { this.commandExecutor = commandExecutor; } + public Publisher reactive(Future future) { + return commandExecutor.reactive(future); + } + public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name) { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } diff --git a/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/src/main/java/org/redisson/reactive/RedissonSetReactive.java index f561e9d5d..17564e6a0 100644 --- a/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -17,11 +17,13 @@ package org.redisson.reactive; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; +import java.util.Set; import org.reactivestreams.Publisher; +import org.redisson.RedissonSet; import org.redisson.api.RSetReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; @@ -37,12 +39,16 @@ import org.redisson.command.CommandReactiveExecutor; */ public class RedissonSetReactive extends RedissonExpirableReactive implements RSetReactive { + private final RedissonSet instance; + public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); + instance = new RedissonSet(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { super(codec, commandExecutor, name); + instance = new RedissonSet(codec, commandExecutor, name); } @Override @@ -57,7 +63,7 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements @Override public Publisher contains(Object o) { - return commandExecutor.readReactive(getName(), codec, RedisCommands.SISMEMBER, getName(), o); + return reactive(instance.containsAsync(o)); } private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { @@ -71,31 +77,22 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements @Override public Publisher removeRandom() { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.SPOP_SINGLE, getName()); + return reactive(instance.removeRandomAsync()); } @Override public Publisher remove(Object o) { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.SREM_SINGLE, getName(), o); + return reactive(instance.removeAsync(o)); } @Override public Publisher move(String destination, V member) { - return commandExecutor.writeReactive(getName(), codec, RedisCommands.SMOVE, getName(), destination, member); + return reactive(instance.moveAsync(destination, member)); } @Override public Publisher containsAll(Collection c) { - return commandExecutor.evalReadReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, - "local s = redis.call('smembers', KEYS[1]);" + - "for i = 0, table.getn(s), 1 do " + - "for j = 0, table.getn(ARGV), 1 do " - + "if ARGV[j] == s[i] " - + "then table.remove(ARGV, j) end " - + "end; " - + "end;" - + "return table.getn(ARGV) == 0 and 1 or 0; ", - Collections.singletonList(getName()), c.toArray()); + return reactive(instance.containsAllAsync(c)); } @Override @@ -108,39 +105,25 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements @Override public Publisher retainAll(Collection c) { - return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, - "local changed = 0 " + - "local s = redis.call('smembers', KEYS[1]) " - + "local i = 0 " - + "while i <= table.getn(s) do " - + "local element = s[i] " - + "local isInAgrs = false " - + "for j = 0, table.getn(ARGV), 1 do " - + "if ARGV[j] == element then " - + "isInAgrs = true " - + "break " - + "end " - + "end " - + "if isInAgrs == false then " - + "redis.call('SREM', KEYS[1], element) " - + "changed = 1 " - + "end " - + "i = i + 1 " - + "end " - + "return changed ", - Collections.singletonList(getName()), c.toArray()); + return reactive(instance.retainAllAsync(c)); } @Override public Publisher removeAll(Collection c) { - return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, - "local v = 0 " + - "for i = 0, table.getn(ARGV), 1 do " - + "if redis.call('srem', KEYS[1], ARGV[i]) == 1 " - + "then v = 1 end " - +"end " - + "return v ", - Collections.singletonList(getName()), c.toArray()); + return reactive(instance.removeAllAsync(c)); + } + + @Override + public Publisher union(String... names) { + List args = new ArrayList(names.length + 1); + args.add(getName()); + args.addAll(Arrays.asList(names)); + return commandExecutor.writeReactive(getName(), codec, RedisCommands.SUNIONSTORE, args.toArray()); + } + + @Override + public Publisher> readUnion(String... names) { + return reactive(instance.readUnionAsync(names)); } @Override diff --git a/src/test/java/org/redisson/RedissonSetTest.java b/src/test/java/org/redisson/RedissonSetTest.java index 5953c339a..c2999c3e6 100644 --- a/src/test/java/org/redisson/RedissonSetTest.java +++ b/src/test/java/org/redisson/RedissonSetTest.java @@ -289,6 +289,39 @@ public class RedissonSetTest extends BaseTest { assertThat(set).containsOnly(1, 2); } + @Test + public void testUnion() { + RSet set = redisson.getSet("set"); + set.add(5); + set.add(6); + RSet set1 = redisson.getSet("set1"); + set1.add(1); + set1.add(2); + RSet set2 = redisson.getSet("set2"); + set2.add(3); + set2.add(4); + + assertThat(set.union("set1", "set2")).isEqualTo(4); + assertThat(set).containsOnly(1, 2, 3, 4); + } + + @Test + public void testReadUnion() { + RSet set = redisson.getSet("set"); + set.add(5); + set.add(6); + RSet set1 = redisson.getSet("set1"); + set1.add(1); + set1.add(2); + RSet set2 = redisson.getSet("set2"); + set2.add(3); + set2.add(4); + + assertThat(set.readUnion("set1", "set2")).containsOnly(1, 2, 3, 4, 5, 6); + assertThat(set).containsOnly(5, 6); + } + + @Test public void testMove() throws Exception { RSet set = redisson.getSet("set"); @@ -297,12 +330,12 @@ public class RedissonSetTest extends BaseTest { set.add(1); set.add(2); - Assert.assertTrue(set.move("otherSet", 1)); + assertThat(set.move("otherSet", 1)).isTrue(); - Assert.assertEquals(1, set.size()); + assertThat(set.size()).isEqualTo(1); assertThat(set).contains(2); - Assert.assertEquals(1, otherSet.size()); + assertThat(otherSet.size()).isEqualTo(1); assertThat(otherSet).contains(1); }