RListReactive, RMapCacheReactive, RSetCacheReactive and RSetReactive are up-to-date to Async interfaces. #1441

pull/1461/head
Nikita 7 years ago
parent 1b62f99e6f
commit daac20fcd2

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -56,18 +55,15 @@ import io.netty.buffer.ByteBuf;
*/ */
public abstract class RedissonMultimap<K, V> extends RedissonExpirable implements RMultimap<K, V> { public abstract class RedissonMultimap<K, V> extends RedissonExpirable implements RMultimap<K, V> {
private final UUID id;
final String prefix; final String prefix;
RedissonMultimap(CommandAsyncExecutor commandAsyncExecutor, String name) { RedissonMultimap(CommandAsyncExecutor commandAsyncExecutor, String name) {
super(commandAsyncExecutor, name); super(commandAsyncExecutor, name);
this.id = commandAsyncExecutor.getConnectionManager().getId();
prefix = suffixName(getName(), ""); prefix = suffixName(getName(), "");
} }
RedissonMultimap(Codec codec, CommandAsyncExecutor commandAsyncExecutor, String name) { RedissonMultimap(Codec codec, CommandAsyncExecutor commandAsyncExecutor, String name) {
super(codec, commandAsyncExecutor, name); super(codec, commandAsyncExecutor, name);
this.id = commandAsyncExecutor.getConnectionManager().getId();
prefix = suffixName(getName(), ""); prefix = suffixName(getName(), "");
} }

@ -92,6 +92,11 @@ public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<
*/ */
void trim(int fromIndex, int toIndex); void trim(int fromIndex, int toIndex);
/**
* Remove object by specified index
*
* @param index - index of object
*/
void fastRemove(int index); void fastRemove(int index);
} }

@ -16,6 +16,7 @@
package org.redisson.api; package org.redisson.api;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -29,6 +30,32 @@ import org.reactivestreams.Publisher;
// TODO add sublist support // TODO add sublist support
public interface RListReactive<V> extends RCollectionReactive<V> { public interface RListReactive<V> extends RCollectionReactive<V> {
/**
* Loads elements by specified <code>indexes</code>
*
* @param indexes of elements
* @return elements
*/
Publisher<List<V>> get(int ...indexes);
/**
* Add <code>element</code> after <code>elementToFind</code>
*
* @param elementToFind - object to find
* @param element - object to add
* @return new list size
*/
Publisher<Integer> addAfter(V elementToFind, V element);
/**
* Add <code>element</code> before <code>elementToFind</code>
*
* @param elementToFind - object to find
* @param element - object to add
* @return new list size
*/
Publisher<Integer> addBefore(V elementToFind, V element);
Publisher<V> descendingIterator(); Publisher<V> descendingIterator();
Publisher<V> descendingIterator(int startIndex); Publisher<V> descendingIterator(int startIndex);
@ -50,5 +77,30 @@ public interface RListReactive<V> extends RCollectionReactive<V> {
Publisher<V> get(long index); Publisher<V> get(long index);
Publisher<V> remove(long index); Publisher<V> remove(long index);
/**
* Read all elements at once
*
* @return list of values
*/
Publisher<List<V>> readAll();
/**
* Trim list and remains elements only in specified range
* <tt>fromIndex</tt>, inclusive, and <tt>toIndex</tt>, inclusive.
*
* @param fromIndex - from index
* @param toIndex - to index
* @return void
*/
Publisher<Void> trim(int fromIndex, int toIndex);
/**
* Remove object by specified index
*
* @param index - index of object
* @return void
*/
Publisher<Void> fastRemove(long index);
} }

@ -40,6 +40,24 @@ import org.reactivestreams.Publisher;
*/ */
public interface RMapCacheReactive<K, V> extends RMapReactive<K, V> { public interface RMapCacheReactive<K, V> extends RMapReactive<K, V> {
/**
* Sets max size of the map.
* Superfluous elements are evicted using LRU algorithm.
*
* @param maxSize - max size
* @return void
*/
Publisher<Void> setMaxSize(int maxSize);
/**
* Tries to set max size of the map.
* Superfluous elements are evicted using LRU algorithm.
*
* @param maxSize - max size
* @return <code>true</code> if max size has been successfully set, otherwise <code>false</code>.
*/
Publisher<Boolean> trySetMaxSize(int maxSize);
/** /**
* If the specified key is not already associated * If the specified key is not already associated
* with a value, associate it with the given value. * with a value, associate it with the given value.
@ -205,5 +223,14 @@ public interface RMapCacheReactive<K, V> extends RMapReactive<K, V> {
*/ */
@Override @Override
Publisher<Integer> size(); Publisher<Integer> size();
/**
* Remaining time to live of map entry associated with a <code>key</code>.
*
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Publisher<Long> remainTimeToLive(K key);
} }

@ -26,7 +26,7 @@ import org.reactivestreams.Publisher;
* @param <K> key type * @param <K> key type
* @param <V> value type * @param <V> value type
*/ */
public interface RMultimapReactive<K, V> { public interface RMultimapReactive<K, V> extends RExpirableReactive {
/** /**
* Returns the number of key-value pairs in this multimap. * Returns the number of key-value pairs in this multimap.

@ -15,6 +15,7 @@
*/ */
package org.redisson.api; package org.redisson.api;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -39,4 +40,11 @@ public interface RSetCacheReactive<V> extends RCollectionReactive<V> {
@Override @Override
Publisher<Integer> size(); Publisher<Integer> size();
/**
* Read all elements at once
*
* @return values
*/
Publisher<Set<V>> readAll();
} }

@ -28,6 +28,15 @@ import org.reactivestreams.Publisher;
*/ */
public interface RSetReactive<V> extends RCollectionReactive<V> { public interface RSetReactive<V> extends RCollectionReactive<V> {
/**
* Removes and returns random elements from set
* in async mode
*
* @param amount of random values
* @return random values
*/
Publisher<Set<V>> removeRandom(int amount);
/** /**
* Removes and returns random element from set * Removes and returns random element from set
* in async mode * in async mode
@ -54,6 +63,13 @@ public interface RSetReactive<V> extends RCollectionReactive<V> {
*/ */
Publisher<Boolean> move(String destination, V member); Publisher<Boolean> move(String destination, V member);
/**
* Read all elements at once
*
* @return values
*/
Publisher<Set<V>> readAll();
/** /**
* Union sets specified by name and write to current set. * Union sets specified by name and write to current set.
* If current set already exists, it is overwritten. * If current set already exists, it is overwritten.
@ -81,6 +97,15 @@ public interface RSetReactive<V> extends RCollectionReactive<V> {
*/ */
Publisher<Long> diff(String... names); Publisher<Long> diff(String... names);
/**
* Diff sets specified by name with current set.
* Without current set state change.
*
* @param names - name of sets
* @return values
*/
Publisher<Set<V>> readDiff(String... names);
/** /**
* Intersection sets specified by name and write to current set. * Intersection sets specified by name and write to current set.
* If current set already exists, it is overwritten. * If current set already exists, it is overwritten.

@ -15,19 +15,16 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RMultimap; import org.redisson.api.RMultimap;
import org.redisson.api.RMultimapReactive; import org.redisson.api.RMultimapReactive;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
import org.redisson.misc.Hash;
import io.netty.buffer.ByteBuf;
import reactor.fn.Supplier; import reactor.fn.Supplier;
/** /**
@ -39,7 +36,7 @@ import reactor.fn.Supplier;
*/ */
abstract class RedissonBaseMultimapReactive<K, V> extends RedissonExpirableReactive implements RMultimapReactive<K, V> { abstract class RedissonBaseMultimapReactive<K, V> extends RedissonExpirableReactive implements RMultimapReactive<K, V> {
private final RMultimap<K, V> instance; protected final RMultimap<K, V> instance;
public RedissonBaseMultimapReactive(RMultimap<K, V> instance, CommandReactiveExecutor commandExecutor, String name) { public RedissonBaseMultimapReactive(RMultimap<K, V> instance, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name, instance); super(commandExecutor, name, instance);
@ -150,31 +147,45 @@ abstract class RedissonBaseMultimapReactive<K, V> extends RedissonExpirableReact
} }
}); });
} }
protected String hash(ByteBuf objectState) {
return Hash.hash128toBase64(objectState);
}
protected String hashAndRelease(ByteBuf objectState) { @Override
try { public Publisher<Boolean> delete() {
return Hash.hash128toBase64(objectState); return reactive(new Supplier<RFuture<Boolean>>() {
} finally { @Override
objectState.release(); public RFuture<Boolean> get() {
} return instance.deleteAsync();
}
});
} }
String getValuesName(String hash) { @Override
return "{" + getName() + "}:" + hash; public Publisher<Boolean> expire(final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.expireAsync(timeToLive, timeUnit);
}
});
} }
protected <T> Publisher<T> fastRemove(List<Object> mapKeys, List<Object> listKeys, RedisCommand<T> evalCommandType) { @Override
return commandExecutor.evalWriteReactive(getName(), codec, evalCommandType, public Publisher<Boolean> expireAt(final long timestamp) {
"local res = redis.call('hdel', KEYS[1], unpack(ARGV)); " + return reactive(new Supplier<RFuture<Boolean>>() {
"if res > 0 then " + @Override
"redis.call('del', unpack(KEYS, 2, #KEYS)); " + public RFuture<Boolean> get() {
"end; " + return instance.expireAtAsync(timestamp);
"return res; ", }
listKeys, mapKeys.toArray()); });
} }
@Override
public Publisher<Boolean> clearExpire() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.clearExpireAsync();
}
});
}
} }

@ -15,21 +15,20 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonListMultimap; import org.redisson.RedissonListMultimap;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RListMultimap;
import org.redisson.api.RListMultimapReactive; import org.redisson.api.RListMultimapReactive;
import org.redisson.api.RListReactive; import org.redisson.api.RListReactive;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
import io.netty.buffer.ByteBuf; import reactor.fn.Supplier;
/** /**
* *
@ -49,95 +48,39 @@ public class RedissonListMultimapReactive<K, V> extends RedissonBaseMultimapReac
} }
@Override @Override
public RListReactive<V> get(final K key) { public RListReactive<V> get(K key) {
final ByteBuf keyState = encodeMapKey(key); RList<V> list = ((RListMultimap<K, V>)instance).get(key);
final String keyHash = hashAndRelease(keyState); return new RedissonListReactive<V>(codec, commandExecutor, list.getName(), list);
final String setName = getValuesName(keyHash);
return new RedissonListReactive<V>(codec, commandExecutor, setName) {
@Override
public Publisher<Boolean> delete() {
ByteBuf keyState = encodeMapKey(key);
return RedissonListMultimapReactive.this.fastRemove(Arrays.<Object>asList(keyState), Arrays.<Object>asList(setName), RedisCommands.EVAL_BOOLEAN_AMOUNT);
}
@Override
public Publisher<Boolean> clearExpire() {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> expireAt(long timestamp) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Long> remainTimeToLive() {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Void> rename(String newName) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> renamenx(String newName) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
};
} }
@Override @Override
public Publisher<List<V>> getAll(K key) { public Publisher<List<V>> getAll(final K key) {
ByteBuf keyState = encodeMapKey(key); return reactive(new Supplier<RFuture<List<V>>>() {
String keyHash = hashAndRelease(keyState); @Override
String setName = getValuesName(keyHash); public RFuture<List<V>> get() {
return (RFuture<List<V>>)(Object)((RListMultimap<K, V>)instance).getAllAsync(key);
return commandExecutor.readReactive(getName(), codec, RedisCommands.LRANGE, setName, 0, -1); }
});
} }
@Override @Override
public Publisher<List<V>> removeAll(Object key) { public Publisher<List<V>> removeAll(final Object key) {
ByteBuf keyState = encodeMapKey(key); return reactive(new Supplier<RFuture<List<V>>>() {
String keyHash = hash(keyState); @Override
public RFuture<List<V>> get() {
String setName = getValuesName(keyHash); return (RFuture<List<V>>)(Object)((RListMultimap<K, V>)instance).removeAllAsync(key);
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LIST, }
"redis.call('hdel', KEYS[1], ARGV[1]); " + });
"local members = redis.call('lrange', KEYS[2], 0, -1); " +
"redis.call('del', KEYS[2]); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), keyState);
} }
@Override @Override
public Publisher<List<V>> replaceValues(K key, Iterable<? extends V> values) { public Publisher<List<V>> replaceValues(final K key, final Iterable<? extends V> values) {
List<Object> params = new ArrayList<Object>(); return reactive(new Supplier<RFuture<List<V>>>() {
ByteBuf keyState = encodeMapKey(key); @Override
params.add(keyState); public RFuture<List<V>> get() {
String keyHash = hash(keyState); return (RFuture<List<V>>)(Object)((RListMultimap<K, V>)instance).replaceValuesAsync(key, values);
params.add(keyHash); }
for (Object value : values) { });
ByteBuf valueState = encodeMapValue(value);
params.add(valueState);
}
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LIST,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"local members = redis.call('lrange', KEYS[2], 0, -1); " +
"redis.call('del', KEYS[2]); " +
"redis.call('rpush', KEYS[2], unpack(ARGV, 3, #ARGV)); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), params.toArray());
} }
} }

@ -63,10 +63,20 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
super(codec, commandExecutor, name, new RedissonList<V>(codec, commandExecutor, name, null)); super(codec, commandExecutor, name, new RedissonList<V>(codec, commandExecutor, name, null));
this.instance = (RListAsync<V>) super.instance; this.instance = (RListAsync<V>) super.instance;
} }
public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RListAsync<V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = (RListAsync<V>) super.instance;
}
@Override @Override
public Publisher<Integer> size() { public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.LLEN_INT, getName()); return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sizeAsync();
}
});
} }
@Override @Override
@ -143,7 +153,67 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}; };
} }
@Override
public Publisher<Void> trim(final int fromIndex, final int toIndex) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.trimAsync(fromIndex, toIndex);
}
});
}
@Override
public Publisher<Void> fastRemove(final long index) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.fastRemoveAsync(index);
}
});
}
@Override
public Publisher<List<V>> readAll() {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.readAllAsync();
}
});
}
@Override
public Publisher<Integer> addBefore(final V elementToFind, final V element) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.addBeforeAsync(elementToFind, element);
}
});
}
@Override
public Publisher<Integer> addAfter(final V elementToFind, final V element) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.addAfterAsync(elementToFind, element);
}
});
}
@Override
public Publisher<List<V>> get(final int ...indexes) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.getAsync(indexes);
}
});
}
@Override @Override
public Publisher<Integer> add(V e) { public Publisher<Integer> add(V e) {
return commandExecutor.writeReactive(getName(), codec, RPUSH, getName(), encode(e)); return commandExecutor.writeReactive(getName(), codec, RPUSH, getName(), encode(e));

@ -81,6 +81,36 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
this.mapCache = mapCache; this.mapCache = mapCache;
} }
@Override
public Publisher<Void> setMaxSize(final int maxSize) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return mapCache.setMaxSizeAsync(maxSize);
}
});
}
@Override
public Publisher<Boolean> trySetMaxSize(final int maxSize) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return mapCache.trySetMaxSizeAsync(maxSize);
}
});
}
@Override
public Publisher<Long> remainTimeToLive(final K key) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return mapCache.remainTimeToLiveAsync(key);
}
});
}
@Override @Override
public Publisher<Boolean> containsKey(final Object key) { public Publisher<Boolean> containsKey(final Object key) {
return reactive(new Supplier<RFuture<Boolean>>() { return reactive(new Supplier<RFuture<Boolean>>() {

@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -138,6 +139,16 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
Arrays.<Object>asList(getName()), System.currentTimeMillis(), timeoutDate, encode(value)); Arrays.<Object>asList(getName()), System.currentTimeMillis(), timeoutDate, encode(value));
} }
@Override
public Publisher<Set<V>> readAll() {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readAllAsync();
}
});
}
@Override @Override
public Publisher<Boolean> remove(final Object o) { public Publisher<Boolean> remove(final Object o) {
return reactive(new Supplier<RFuture<Boolean>>() { return reactive(new Supplier<RFuture<Boolean>>() {

@ -15,22 +15,20 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonSetMultimap; import org.redisson.RedissonSetMultimap;
import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.RSetMultimap;
import org.redisson.api.RSetMultimapReactive; import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive; import org.redisson.api.RSetReactive;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
import io.netty.buffer.ByteBuf; import reactor.fn.Supplier;
/** /**
* *
@ -50,95 +48,39 @@ public class RedissonSetMultimapReactive<K, V> extends RedissonBaseMultimapReact
} }
@Override @Override
public RSetReactive<V> get(final K key) { public RSetReactive<V> get(K key) {
final ByteBuf keyState = encodeMapKey(key); RSet<V> set = ((RSetMultimap<K, V>)instance).get(key);
final String keyHash = hashAndRelease(keyState); return new RedissonSetReactive<V>(codec, commandExecutor, set.getName(), set);
final String setName = getValuesName(keyHash);
return new RedissonSetReactive<V>(codec, commandExecutor, setName) {
@Override
public Publisher<Boolean> delete() {
ByteBuf keyState = encodeMapKey(key);
return RedissonSetMultimapReactive.this.fastRemove(Arrays.<Object>asList(keyState), Arrays.<Object>asList(setName), RedisCommands.EVAL_BOOLEAN_AMOUNT);
}
@Override
public Publisher<Boolean> clearExpire() {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> expireAt(long timestamp) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Long> remainTimeToLive() {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Void> rename(String newName) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> renamenx(String newName) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
};
} }
@Override @Override
public Publisher<Set<V>> getAll(K key) { public Publisher<Set<V>> getAll(final K key) {
ByteBuf keyState = encodeMapKey(key); return reactive(new Supplier<RFuture<Set<V>>>() {
String keyHash = hashAndRelease(keyState); @Override
String setName = getValuesName(keyHash); public RFuture<Set<V>> get() {
return (RFuture<Set<V>>)(Object)((RSetMultimap<K, V>)instance).getAllAsync(key);
return commandExecutor.readReactive(getName(), codec, RedisCommands.SMEMBERS, setName); }
});
} }
@Override @Override
public Publisher<Set<V>> removeAll(Object key) { public Publisher<Set<V>> removeAll(final Object key) {
ByteBuf keyState = encodeMapKey(key); return reactive(new Supplier<RFuture<Set<V>>>() {
String keyHash = hash(keyState); @Override
public RFuture<Set<V>> get() {
String setName = getValuesName(keyHash); return (RFuture<Set<V>>)(Object)((RSetMultimap<K, V>)instance).removeAllAsync(key);
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_SET, }
"redis.call('hdel', KEYS[1], ARGV[1]); " + });
"local members = redis.call('smembers', KEYS[2]); " +
"redis.call('del', KEYS[2]); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), keyState);
} }
@Override @Override
public Publisher<Set<V>> replaceValues(K key, Iterable<? extends V> values) { public Publisher<Set<V>> replaceValues(final K key, final Iterable<? extends V> values) {
List<Object> params = new ArrayList<Object>(); return reactive(new Supplier<RFuture<Set<V>>>() {
ByteBuf keyState = encodeMapKey(key); @Override
params.add(keyState); public RFuture<Set<V>> get() {
String keyHash = hash(keyState); return (RFuture<Set<V>>)(Object)((RSetMultimap<K, V>)instance).replaceValuesAsync(key, values);
params.add(keyHash); }
for (Object value : values) { });
ByteBuf valueState = encodeMapValue(value);
params.add(valueState);
}
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_SET,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"local members = redis.call('smembers', KEYS[2]); " +
"redis.call('del', KEYS[2]); " +
"redis.call('sadd', KEYS[2], unpack(ARGV, 3, #ARGV)); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), params.toArray());
} }
} }

@ -71,9 +71,24 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
return new PublisherAdder<V>(this).addAll(c); return new PublisherAdder<V>(this).addAll(c);
} }
@Override
public Publisher<Set<V>> removeRandom(final int amount) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.removeRandomAsync(amount);
}
});
}
@Override @Override
public Publisher<Integer> size() { public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.SCARD_INT, getName()); return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sizeAsync();
}
});
} }
@Override @Override
@ -85,6 +100,16 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
} }
}); });
} }
@Override
public Publisher<Set<V>> readAll() {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readAllAsync();
}
});
}
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) { private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos); return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos);
@ -199,6 +224,16 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SDIFFSTORE, args.toArray()); return commandExecutor.writeReactive(getName(), codec, RedisCommands.SDIFFSTORE, args.toArray());
} }
@Override
public Publisher<Set<V>> readDiff(final String... names) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readDiffAsync(names);
}
});
}
@Override @Override
public Publisher<Long> union(String... names) { public Publisher<Long> union(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1); List<Object> args = new ArrayList<Object>(names.length + 1);

Loading…
Cancel
Save