Make RMapReactive and RMapCacheReactive interfaces match RMap and RMapCache. #936

pull/968/head
Nikita 8 years ago
parent 5348829c96
commit 01cefc5641

@ -21,6 +21,7 @@ import java.util.List;
import java.util.UUID;
import org.redisson.api.ClusterNode;
import org.redisson.api.MapOptions;
import org.redisson.api.Node;
import org.redisson.api.NodesGroup;
import org.redisson.api.RAtomicLongReactive;
@ -101,12 +102,12 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCacheReactive<K, V>(id, evictionScheduler, codec, commandExecutor, name);
return new RedissonMapCacheReactive<K, V>(evictionScheduler, codec, commandExecutor, name, null);
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name) {
return new RedissonMapCacheReactive<K, V>(id, evictionScheduler, commandExecutor, name);
return new RedissonMapCacheReactive<K, V>(evictionScheduler, commandExecutor, name, null);
}
@Override
@ -133,6 +134,8 @@ public class RedissonReactive implements RedissonReactiveClient {
return buckets;
}
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(String name) {
return new RedissonHyperLogLogReactive<V>(commandExecutor, name);
@ -155,12 +158,12 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RMapReactive<K, V> getMap(String name) {
return new RedissonMapReactive<K, V>(commandExecutor, name);
return new RedissonMapReactive<K, V>(commandExecutor, name, null);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec) {
return new RedissonMapReactive<K, V>(codec, commandExecutor, name);
return new RedissonMapReactive<K, V>(codec, commandExecutor, name, null);
}
@Override
@ -318,5 +321,28 @@ public class RedissonReactive implements RedissonReactiveClient {
protected void enableRedissonReferenceSupport() {
this.commandExecutor.enableRedissonReferenceSupport(this);
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMapCacheReactive<K, V>(evictionScheduler, codec, commandExecutor, name, options);
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, MapOptions<K, V> options) {
return new RedissonMapCacheReactive<K, V>(evictionScheduler, commandExecutor, name, options);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, MapOptions<K, V> options) {
return new RedissonMapReactive<K, V>(commandExecutor, name, options);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMapReactive<K, V>(codec, commandExecutor, name, options);
}
}

@ -40,10 +40,163 @@ import org.reactivestreams.Publisher;
*/
public interface RMapCacheReactive<K, V> extends RMapReactive<K, V> {
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* <p>
* Stores value mapped by key with specified time to live.
* Entry expires after specified time to live.
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit - time unit
* @return previous associated value
*/
Publisher<V> putIfAbsent(K key, V value, long ttl, TimeUnit unit);
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* <p>
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit - time unit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit - time unit
* <p>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return previous associated value
*/
Publisher<V> putIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* Stores value mapped by key with specified time to live.
* Entry expires after specified time to live.
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit - time unit
* @return previous associated value
*/
Publisher<V> put(K key, V value, long ttl, TimeUnit unit);
/**
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit - time unit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit - time unit
* <p>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return previous associated value
*/
Publisher<V> put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* Stores value mapped by key with specified time to live.
* Entry expires after specified time to live.
* <p>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
* <p>
* Works faster than usual {@link #put(Object, Object, long, TimeUnit)}
* as it not returns previous value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit - time unit
*
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
Publisher<Boolean> fastPut(K key, V value, long ttl, TimeUnit unit);
/**
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p>
* If the map previously contained a mapping for
* the key, the old value is replaced by the specified value.
* <p>
* Works faster than usual {@link #put(Object, Object, long, TimeUnit, long, TimeUnit)}
* as it not returns previous value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit - time unit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit - time unit
* <p>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
Publisher<Boolean> fastPut(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* <p>
* Stores value mapped by key with specified time to live and max idle time.
* Entry expires when specified time to live or max idle time has expired.
* <p>
* Works faster than usual {@link #putIfAbsent(Object, Object, long, TimeUnit, long, TimeUnit)}
* as it not returns previous value.
*
* @param key - map key
* @param value - map value
* @param ttl - time to live for key\value entry.
* If <code>0</code> then time to live doesn't affect entry expiration.
* @param ttlUnit - time unit
* @param maxIdleTime - max idle time for key\value entry.
* If <code>0</code> then max idle time doesn't affect entry expiration.
* @param maxIdleUnit - time unit
* <p>
* if <code>maxIdleTime</code> and <code>ttl</code> params are equal to <code>0</code>
* then entry stores infinitely.
*
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash
*/
Publisher<Boolean> fastPutIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit);
/**
* Returns the number of entries in cache.
* This number can reflects expired entries too

@ -15,10 +15,14 @@
*/
package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.reactivestreams.Publisher;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
/**
* map functions
@ -30,8 +34,56 @@ import org.reactivestreams.Publisher;
*/
public interface RMapReactive<K, V> extends RExpirableReactive {
/**
* Loads all map entries to this Redis map.
*
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.
* @param parallelism - parallelism level, used to increase speed of process execution
* @return void
*/
Publisher<Void> loadAll(boolean replaceExistingValues, int parallelism);
/**
* Loads map entries whose keys are listed in defined <code>keys</code> parameter.
*
* @param keys - map keys
* @param replaceExistingValues - <code>true</code> if existed values should be replaced, <code>false</code> otherwise.
* @param parallelism - parallelism level, used to increase speed of process execution
* @return void
*/
Publisher<Void> loadAll(Set<? extends K> keys, boolean replaceExistingValues, int parallelism);
/**
* Returns size of value mapped by key in bytes
*
* @param key - map key
* @return size of value
*/
Publisher<Integer> valueSize(K key);
/**
* Gets a map slice contained the mappings with defined <code>keys</code>
* by one operation.
* <p>
* If map doesn't contain value/values for specified key/keys and {@link MapLoader} is defined
* then value/values will be loaded in read-through mode.
* <p>
* The returned map is <b>NOT</b> backed by the original map.
*
* @param keys - map keys
* @return Map slice
*/
Publisher<Map<K, V>> getAll(Set<K> keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
* <p>
* If {@link MapWriter} is defined then new map entries are stored in write-through mode.
*
* @param map mappings to be stored in this map
* @return void
*/
Publisher<Void> putAll(Map<? extends K, ? extends V> map);
Publisher<V> addAndGet(K key, Number value);
@ -43,10 +95,12 @@ public interface RMapReactive<K, V> extends RExpirableReactive {
Publisher<Integer> size();
/**
* Removes <code>keys</code> from map by one operation in manner
*
* Works faster than <code>RMap.remove</code> but not returning
* the value associated with <code>key</code>
* Removes <code>keys</code> from map by one operation in async manner.
* <p>
* Works faster than <code>{@link #remove(Object, Object)}</code> but doesn't return
* the value associated with <code>key</code>.
* <p>
* If {@link MapWriter} is defined then <code>keys</code>are deleted in write-through mode.
*
* @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
@ -55,30 +109,147 @@ public interface RMapReactive<K, V> extends RExpirableReactive {
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in manner.
*
* Works faster than <code>RMap.put</code> but not returning
* in async manner.
* <p>
* Works faster than <code>{@link #put(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if key is a new key in the hash and value was set.
* @return <code>true</code> if key is a new one in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
Publisher<Boolean> fastPut(K key, V value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.
* <p>
* Works faster than <code>{@link #putIfAbsent(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if key is a new one in the hash and value was set.
* <code>false</code> if key already exists in the hash and change hasn't been made.
*/
Publisher<Boolean> fastPutIfAbsent(K key, V value);
/**
* Read all keys at once
*
* @return keys
*/
Publisher<Set<K>> readAllKeySet();
/**
* Read all values at once
*
* @return values
*/
Publisher<Collection<V>> readAllValues();
/**
* Read all map entries at once
*
* @return entries
*/
Publisher<Set<Entry<K, V>>> readAllEntrySet();
/**
* Read all map as local instance at once
*
* @return map
*/
Publisher<Map<K, V>> readAllMap();
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
* <p>
* If map doesn't contain value for specified key and {@link MapLoader} is defined
* then value will be loaded in read-through mode.
*
* @param key the key whose associated value is to be returned
* @return the value to which the specified key is mapped, or
* {@code null} if this map contains no mapping for the key
*/
Publisher<V> get(K key);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in async manner.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return previous associated value
*/
Publisher<V> put(K key, V value);
/**
* Removes <code>key</code> from map and returns associated value in async manner.
* <p>
* If {@link MapWriter} is defined then <code>key</code>is deleted in write-through mode.
*
* @param key - map key
* @return deleted value or <code>null</code> if there wasn't any association
*/
Publisher<V> remove(K key);
/**
* Replaces previous value with a new <code>value</code> associated with the <code>key</code>.
* If there wasn't any association before then method returns <code>null</code>.
* <p>
* If {@link MapWriter} is defined then new <code>value</code>is written in write-through mode.
*
* @param key - map key
* @param value - map value
* @return previous associated value
* or <code>null</code> if there wasn't any association and change hasn't been made
*/
Publisher<V> replace(K key, V value);
/**
* Replaces previous <code>oldValue</code> with a <code>newValue</code> associated with the <code>key</code>.
* If previous value doesn't exist or equal to <code>oldValue</code> then method returns <code>false</code>.
* <p>
* If {@link MapWriter} is defined then <code>newValue</code>is written in write-through mode.
*
* @param key - map key
* @param oldValue - map old value
* @param newValue - map new value
* @return <code>true</code> if value has been replaced otherwise <code>false</code>.
*/
Publisher<Boolean> replace(K key, V oldValue, V newValue);
/**
* Removes <code>key</code> from map only if it associated with <code>value</code>.
* <p>
* If {@link MapWriter} is defined then <code>key</code>is deleted in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if map entry has been replaced otherwise <code>false</code>.
*/
Publisher<Boolean> remove(Object key, Object value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>null</code> if key is a new one in the hash and value was set.
* Previous value if key already exists in the hash and change hasn't been made.
*/
Publisher<V> putIfAbsent(K key, V value);
Publisher<Map.Entry<K, V>> entryIterator();

@ -58,9 +58,9 @@ public interface RedissonReactiveClient {
/**
* Returns map-based cache instance by name
* using provided codec for both cache keys and values.
* Supports entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.</p>
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.
*
* @param <K> type of keys
* @param <V> type of values
@ -70,11 +70,27 @@ public interface RedissonReactiveClient {
*/
<K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec);
/**
* Returns map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, Codec, MapOptions)}.
*
* @param <K> type of key
* @param <V> type of value
* @param name - object name
* @param codec - codec for keys and values
* @param options - map options
* @return MapCache object
*/
<K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options);
/**
* Returns map-based cache instance by name.
* Supports entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getMap(String)}.</p>
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String)}.
*
* @param <K> type of keys
* @param <V> type of values
@ -83,6 +99,20 @@ public interface RedissonReactiveClient {
*/
<K, V> RMapCacheReactive<K, V> getMapCache(String name);
/**
* Returns map-based cache instance by name.
* Supports entry eviction with a given MaxIdleTime and TTL settings.
* <p>
* If eviction is not required then it's better to use regular map {@link #getMap(String, MapOptions)}.</p>
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param options - map options
* @return MapCache object
*/
<K, V> RMapCacheReactive<K, V> getMapCache(String name, MapOptions<K, V> options);
/**
* Returns object holder instance by name
*
@ -162,6 +192,17 @@ public interface RedissonReactiveClient {
*/
<K, V> RMapReactive<K, V> getMap(String name);
/**
* Returns map instance by name.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param options - map options
* @return Map object
*/
<K, V> RMapReactive<K, V> getMap(String name, MapOptions<K, V> options);
/**
* Returns map instance by name
* using provided codec for both map keys and values.
@ -174,6 +215,19 @@ public interface RedissonReactiveClient {
*/
<K, V> RMapReactive<K, V> getMap(String name, Codec codec);
/**
* Returns map instance by name
* using provided codec for both map keys and values.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param codec - codec for keys and values
* @param options - map options
* @return Map object
*/
<K, V> RMapReactive<K, V> getMap(String name, Codec codec, MapOptions<K, V> options);
/**
* Returns set instance by name.
*

@ -87,22 +87,22 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public <K, V> RMapReactive<K, V> getMap(String name) {
return new RedissonMapReactive<K, V>(executorService, name);
return new RedissonMapReactive<K, V>(executorService, name, null);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec) {
return new RedissonMapReactive<K, V>(codec, executorService, name);
return new RedissonMapReactive<K, V>(codec, executorService, name, null);
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCacheReactive<K, V>(id, evictionScheduler, codec, executorService, name);
return new RedissonMapCacheReactive<K, V>(evictionScheduler, codec, executorService, name, null);
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name) {
return new RedissonMapCacheReactive<K, V>(id, evictionScheduler, executorService, name);
return new RedissonMapCacheReactive<K, V>(evictionScheduler, executorService, name, null);
}
@Override

@ -16,24 +16,20 @@
package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMapCache;
import org.redisson.api.RMapCache;
import org.redisson.api.MapOptions;
import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.eviction.EvictionScheduler;
@ -63,16 +59,16 @@ import reactor.rx.Streams;
*/
public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive implements RMapCacheReactive<K, V>, MapReactive<K, V> {
private final RMapCache<K, V> mapCache;
private final RMapCacheAsync<K, V> mapCache;
public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, null, null);
this.mapCache = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, options);
}
public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(codec, commandExecutor, name);
this.mapCache = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, null, null);
this.mapCache = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, options);
}
@Override
@ -110,10 +106,6 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
return reactive(mapCache.putAsync(key, value, ttl, unit));
}
String getTimeoutSetName() {
return "redisson__timeout__set__{" + getName() + "}";
}
@Override
public Publisher<V> remove(K key) {
return reactive(mapCache.removeAsync(key));
@ -300,4 +292,72 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
}).next().poll();
}
@Override
public Publisher<Void> loadAll(boolean replaceExistingValues, int parallelism) {
return reactive(mapCache.loadAllAsync(replaceExistingValues, parallelism));
}
@Override
public Publisher<Void> loadAll(Set<? extends K> keys, boolean replaceExistingValues, int parallelism) {
return reactive(mapCache.loadAllAsync(keys, replaceExistingValues, parallelism));
}
@Override
public Publisher<Integer> valueSize(K key) {
return reactive(mapCache.valueSizeAsync(key));
}
@Override
public Publisher<Boolean> fastPutIfAbsent(K key, V value) {
return reactive(mapCache.fastPutIfAbsentAsync(key, value));
}
@Override
public Publisher<Set<K>> readAllKeySet() {
return reactive(mapCache.readAllKeySetAsync());
}
@Override
public Publisher<Collection<V>> readAllValues() {
return reactive(mapCache.readAllValuesAsync());
}
@Override
public Publisher<Set<Entry<K, V>>> readAllEntrySet() {
return reactive(mapCache.readAllEntrySetAsync());
}
@Override
public Publisher<Map<K, V>> readAllMap() {
return reactive(mapCache.readAllMapAsync());
}
@Override
public Publisher<V> putIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime,
TimeUnit maxIdleUnit) {
return reactive(mapCache.putIfAbsentAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit));
}
@Override
public Publisher<V> put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
return reactive(mapCache.putAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit));
}
@Override
public Publisher<Boolean> fastPut(K key, V value, long ttl, TimeUnit unit) {
return reactive(mapCache.fastPutAsync(key, value, ttl, unit));
}
@Override
public Publisher<Boolean> fastPut(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime,
TimeUnit maxIdleUnit) {
return reactive(mapCache.fastPutAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit));
}
@Override
public Publisher<Boolean> fastPutIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime,
TimeUnit maxIdleUnit) {
return reactive(mapCache.fastPutIfAbsentAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit));
}
}

@ -16,12 +16,15 @@
package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
import org.redisson.api.MapOptions;
import org.redisson.api.RMapAsync;
import org.redisson.api.RMapReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
@ -46,16 +49,56 @@ import reactor.rx.Streams;
*/
public class RedissonMapReactive<K, V> extends RedissonExpirableReactive implements RMapReactive<K, V>, MapReactive<K, V> {
private final RedissonMap<K, V> instance;
private final RMapAsync<K, V> instance;
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) {
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(commandExecutor, name);
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null, null, null);
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null, options);
}
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions<K, V> options) {
super(codec, commandExecutor, name);
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null, null, null);
instance = new RedissonMap<K, V>(codec, commandExecutor, name, null, options);
}
@Override
public Publisher<Void> loadAll(boolean replaceExistingValues, int parallelism) {
return reactive(instance.loadAllAsync(replaceExistingValues, parallelism));
}
@Override
public Publisher<Void> loadAll(Set<? extends K> keys, boolean replaceExistingValues, int parallelism) {
return reactive(instance.loadAllAsync(keys, replaceExistingValues, parallelism));
}
@Override
public Publisher<Boolean> fastPutIfAbsent(K key, V value) {
return reactive(instance.fastPutIfAbsentAsync(key, value));
}
@Override
public Publisher<Set<K>> readAllKeySet() {
return reactive(instance.readAllKeySetAsync());
}
@Override
public Publisher<Collection<V>> readAllValues() {
return reactive(instance.readAllValuesAsync());
}
@Override
public Publisher<Set<Entry<K, V>>> readAllEntrySet() {
return reactive(instance.readAllEntrySetAsync());
}
@Override
public Publisher<Map<K, V>> readAllMap() {
return reactive(instance.readAllMapAsync());
}
@Override
public Publisher<Integer> valueSize(K key) {
return reactive(instance.valueSizeAsync(key));
}
@Override

Loading…
Cancel
Save