Merge branch 'mrniko/master'

pull/400/head
jackygurui 9 years ago
commit 0b80ec699e

@ -17,6 +17,7 @@ package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@ -289,6 +290,32 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return get(removeAllAsync(c));
}
@Override
public int union(String... names) {
return get(unionAsync(names));
}
@Override
public Future<Integer> unionAsync(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1);
args.add(getName());
args.addAll(Arrays.asList(names));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNIONSTORE_INT, args.toArray());
}
@Override
public Set<V> readUnion(String... names) {
return get(readUnionAsync(names));
}
@Override
public Future<Set<V>> readUnionAsync(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1);
args.add(getName());
args.addAll(Arrays.asList(names));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNION, args.toArray());
}
@Override
public void clear() {
delete();

@ -15,6 +15,8 @@
*/
package org.redisson.api;
import java.util.Set;
import org.reactivestreams.Publisher;
/**
@ -43,4 +45,22 @@ public interface RSetReactive<V> extends RCollectionReactive<V> {
* member of this set or no operation was performed
*/
Publisher<Boolean> move(String destination, V member);
/**
* Union sets specified by name and write to current set.
* If current set already exists, it is overwritten.
*
* @param names
* @return
*/
Publisher<Long> union(String... names);
/**
* Union sets specified by name with current set.
* Without current set state change.
*
* @param names
* @return
*/
Publisher<Set<V>> readUnion(String... names);
}

@ -118,6 +118,9 @@ public interface RedisCommands {
RedisCommand<Boolean> SISMEMBER = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor(), 2);
RedisStrictCommand<Integer> SCARD_INT = new RedisStrictCommand<Integer>("SCARD", new IntegerReplayConvertor());
RedisStrictCommand<Long> SCARD = new RedisStrictCommand<Long>("SCARD");
RedisStrictCommand<Integer> SUNIONSTORE_INT = new RedisStrictCommand<Integer>("SUNIONSTORE", new IntegerReplayConvertor());
RedisStrictCommand<Long> SUNIONSTORE = new RedisStrictCommand<Long>("SUNIONSTORE");
RedisCommand<Set<Object>> SUNION = new RedisCommand<Set<Object>>("SUNION", new ObjectSetReplayDecoder());
RedisCommand<Void> LSET = new RedisCommand<Void>("LSET", new VoidReplayConvertor(), 3);
RedisCommand<Object> LPOP = new RedisCommand<Object>("LPOP");

@ -50,4 +50,22 @@ public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V> {
*/
Set<V> readAll();
/**
* Union sets specified by name and write to current set.
* If current set already exists, it is overwritten.
*
* @param names
* @return
*/
int union(String... names);
/**
* Union sets specified by name with current set.
* Without current set state change.
*
* @param names
* @return
*/
Set<V> readUnion(String... names);
}

@ -53,4 +53,22 @@ public interface RSetAsync<V> extends RCollectionAsync<V> {
*/
Future<Set<V>> readAllAsync();
/**
* Union sets specified by name and write to current set.
* If current set already exists, it is overwritten.
*
* @param names
* @return
*/
Future<Integer> unionAsync(String... keys);
/**
* Union sets specified by name with current set.
* Without current set state change.
*
* @param names
* @return
*/
Future<Set<V>> readUnionAsync(String... keys);
}

@ -15,32 +15,20 @@
*/
package org.redisson.reactive;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
import org.redisson.api.RMapReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import reactor.fn.BiFunction;
import reactor.fn.Function;
@ -58,156 +46,86 @@ import reactor.rx.Streams;
*/
public class RedissonMapReactive<K, V> extends RedissonExpirableReactive implements RMapReactive<K, V> {
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
private final RedissonMap<K, V> instance;
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonMap<>(codec, commandExecutor, name);
}
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonMap<>(codec, commandExecutor, name);
}
@Override
public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.HLEN, getName());
return reactive(instance.sizeAsync());
}
@Override
public Publisher<Boolean> containsKey(Object key) {
return commandExecutor.readReactive(getName(), codec, RedisCommands.HEXISTS, getName(), key);
return reactive(instance.containsKeyAsync(key));
}
@Override
public Publisher<Boolean> containsValue(Object value) {
return commandExecutor.evalReadReactive(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do "
+ "if ARGV[1] == s[i] then "
+ "return 1 "
+ "end "
+ "end;" +
"return 0",
Collections.<Object>singletonList(getName()), value);
return reactive(instance.containsValueAsync(value));
}
@Override
public Publisher<Map<K, V>> getAll(Set<K> keys) {
if (keys.size() == 0) {
return newSucceeded(Collections.<K, V>emptyMap());
}
List<Object> args = new ArrayList<Object>(keys.size() + 1);
args.add(getName());
args.addAll(keys);
return commandExecutor.readReactive(getName(), codec, new RedisCommand<Map<Object, Object>>("HMGET", new MapGetAllDecoder(args), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE), args.toArray());
return reactive(instance.getAllAsync(keys));
}
public Publisher<Void> putAll(Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return newSucceeded(null);
}
List<Object> params = new ArrayList<Object>(map.size()*2 + 1);
params.add(getName());
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
params.add(t.getKey());
params.add(t.getValue());
}
return commandExecutor.writeReactive(getName(), codec, RedisCommands.HMSET, params.toArray());
return reactive(instance.putAllAsync(map));
}
@Override
public Publisher<V> putIfAbsent(K key, V value) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_PUT,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return nil "
+ "else "
+ "return redis.call('hget', KEYS[1], ARGV[1]) "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
return reactive(instance.putIfAbsentAsync(key, value));
}
@Override
public Publisher<Long> remove(Object key, Object value) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "
+ "else "
+ "return 0 "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
return reactive(instance.removeAsync(key, value));
}
@Override
public Publisher<Boolean> replace(K key, V oldValue, V newValue) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REPLACE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Collections.<Object>singletonList(getName()), key, oldValue, newValue);
return reactive(instance.replaceAsync(key, oldValue, newValue));
}
@Override
public Publisher<V> replace(K key, V value) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REPLACE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return v; "
+ "else "
+ "return nil; "
+ "end",
Collections.<Object>singletonList(getName()), key, value);
return reactive(instance.replaceAsync(key, value));
}
@Override
public Publisher<V> get(K key) {
return commandExecutor.readReactive(getName(), codec, RedisCommands.HGET, getName(), key);
return reactive(instance.getAsync(key));
}
@Override
public Publisher<V> put(K key, V value) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_PUT,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return v",
Collections.<Object>singletonList(getName()), key, value);
return reactive(instance.putAsync(key, value));
}
@Override
public Publisher<V> remove(K key) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "return v",
Collections.<Object>singletonList(getName()), key);
return reactive(instance.removeAsync(key));
}
@Override
public Publisher<Boolean> fastPut(K key, V value) {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.HSET, getName(), key, value);
return reactive(instance.fastPutAsync(key, value));
}
@Override
public Publisher<Long> fastRemove(K ... keys) {
if (keys == null || keys.length == 0) {
return newSucceeded(0L);
}
List<Object> args = new ArrayList<Object>(keys.length + 1);
args.add(getName());
args.addAll(Arrays.asList(keys));
return commandExecutor.writeReactive(getName(), codec, RedisCommands.HDEL, args.toArray());
return reactive(instance.fastRemoveAsync(keys));
}
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
@ -241,14 +159,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
@Override
public Publisher<V> addAndGet(K key, Number value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE,
new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
getName(), keyState, new BigDecimal(value.toString()).toPlainString());
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
return reactive(instance.addAndGetAsync(key, value));
}
@Override

@ -21,6 +21,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import io.netty.util.concurrent.Future;
import reactor.rx.Stream;
import reactor.rx.Streams;
@ -42,6 +43,10 @@ abstract class RedissonObjectReactive implements RObjectReactive {
this.commandExecutor = commandExecutor;
}
public <R> Publisher<R> reactive(Future<R> future) {
return commandExecutor.reactive(future);
}
public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}

@ -17,11 +17,13 @@ package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSet;
import org.redisson.api.RSetReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
@ -37,12 +39,16 @@ import org.redisson.command.CommandReactiveExecutor;
*/
public class RedissonSetReactive<V> extends RedissonExpirableReactive implements RSetReactive<V> {
private final RedissonSet<V> instance;
public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonSet<V>(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
public RedissonSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonSet<V>(codec, commandExecutor, name);
}
@Override
@ -57,7 +63,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
@Override
public Publisher<Boolean> contains(Object o) {
return commandExecutor.readReactive(getName(), codec, RedisCommands.SISMEMBER, getName(), o);
return reactive(instance.containsAsync(o));
}
private Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long startPos) {
@ -71,31 +77,22 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
@Override
public Publisher<V> removeRandom() {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SPOP_SINGLE, getName());
return reactive(instance.removeRandomAsync());
}
@Override
public Publisher<Boolean> remove(Object o) {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SREM_SINGLE, getName(), o);
return reactive(instance.removeAsync(o));
}
@Override
public Publisher<Boolean> move(String destination, V member) {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SMOVE, getName(), destination, member);
return reactive(instance.moveAsync(destination, member));
}
@Override
public Publisher<Boolean> containsAll(Collection<?> c) {
return commandExecutor.evalReadReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local s = redis.call('smembers', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
return reactive(instance.containsAllAsync(c));
}
@Override
@ -108,39 +105,25 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
@Override
public Publisher<Boolean> retainAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local changed = 0 " +
"local s = redis.call('smembers', KEYS[1]) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('SREM', KEYS[1], element) "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
return reactive(instance.retainAllAsync(c));
}
@Override
public Publisher<Boolean> removeAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('srem', KEYS[1], ARGV[i]) == 1 "
+ "then v = 1 end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
return reactive(instance.removeAllAsync(c));
}
@Override
public Publisher<Long> union(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1);
args.add(getName());
args.addAll(Arrays.asList(names));
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SUNIONSTORE, args.toArray());
}
@Override
public Publisher<Set<V>> readUnion(String... names) {
return reactive(instance.readUnionAsync(names));
}
@Override

@ -289,6 +289,39 @@ public class RedissonSetTest extends BaseTest {
assertThat(set).containsOnly(1, 2);
}
@Test
public void testUnion() {
RSet<Integer> set = redisson.getSet("set");
set.add(5);
set.add(6);
RSet<Integer> set1 = redisson.getSet("set1");
set1.add(1);
set1.add(2);
RSet<Integer> set2 = redisson.getSet("set2");
set2.add(3);
set2.add(4);
assertThat(set.union("set1", "set2")).isEqualTo(4);
assertThat(set).containsOnly(1, 2, 3, 4);
}
@Test
public void testReadUnion() {
RSet<Integer> set = redisson.getSet("set");
set.add(5);
set.add(6);
RSet<Integer> set1 = redisson.getSet("set1");
set1.add(1);
set1.add(2);
RSet<Integer> set2 = redisson.getSet("set2");
set2.add(3);
set2.add(4);
assertThat(set.readUnion("set1", "set2")).containsOnly(1, 2, 3, 4, 5, 6);
assertThat(set).containsOnly(5, 6);
}
@Test
public void testMove() throws Exception {
RSet<Integer> set = redisson.getSet("set");
@ -297,12 +330,12 @@ public class RedissonSetTest extends BaseTest {
set.add(1);
set.add(2);
Assert.assertTrue(set.move("otherSet", 1));
assertThat(set.move("otherSet", 1)).isTrue();
Assert.assertEquals(1, set.size());
assertThat(set.size()).isEqualTo(1);
assertThat(set).contains(2);
Assert.assertEquals(1, otherSet.size());
assertThat(otherSet.size()).isEqualTo(1);
assertThat(otherSet).contains(1);
}

Loading…
Cancel
Save