diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index f1514d3f4..3ee61de18 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.UUID; import org.redisson.api.ClusterNode; +import org.redisson.api.MapOptions; import org.redisson.api.Node; import org.redisson.api.NodesGroup; import org.redisson.api.RAtomicLongReactive; @@ -101,12 +102,12 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RMapCacheReactive getMapCache(String name, Codec codec) { - return new RedissonMapCacheReactive(id, evictionScheduler, codec, commandExecutor, name); + return new RedissonMapCacheReactive(evictionScheduler, codec, commandExecutor, name, null); } @Override public RMapCacheReactive getMapCache(String name) { - return new RedissonMapCacheReactive(id, evictionScheduler, commandExecutor, name); + return new RedissonMapCacheReactive(evictionScheduler, commandExecutor, name, null); } @Override @@ -133,6 +134,8 @@ public class RedissonReactive implements RedissonReactiveClient { return buckets; } + + @Override public RHyperLogLogReactive getHyperLogLog(String name) { return new RedissonHyperLogLogReactive(commandExecutor, name); @@ -155,12 +158,12 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RMapReactive getMap(String name) { - return new RedissonMapReactive(commandExecutor, name); + return new RedissonMapReactive(commandExecutor, name, null); } @Override public RMapReactive getMap(String name, Codec codec) { - return new RedissonMapReactive(codec, commandExecutor, name); + return new RedissonMapReactive(codec, commandExecutor, name, null); } @Override @@ -318,5 +321,28 @@ public class RedissonReactive implements RedissonReactiveClient { protected void enableRedissonReferenceSupport() { this.commandExecutor.enableRedissonReferenceSupport(this); } + + @Override + public RMapCacheReactive getMapCache(String name, Codec codec, MapOptions options) { + return new RedissonMapCacheReactive(evictionScheduler, codec, commandExecutor, name, options); + } + + + @Override + public RMapCacheReactive getMapCache(String name, MapOptions options) { + return new RedissonMapCacheReactive(evictionScheduler, commandExecutor, name, options); + } + + + @Override + public RMapReactive getMap(String name, MapOptions options) { + return new RedissonMapReactive(commandExecutor, name, options); + } + + + @Override + public RMapReactive getMap(String name, Codec codec, MapOptions options) { + return new RedissonMapReactive(codec, commandExecutor, name, options); + } } diff --git a/redisson/src/main/java/org/redisson/api/RMapCacheReactive.java b/redisson/src/main/java/org/redisson/api/RMapCacheReactive.java index 790b4822d..8840e59a8 100644 --- a/redisson/src/main/java/org/redisson/api/RMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapCacheReactive.java @@ -40,10 +40,163 @@ import org.reactivestreams.Publisher; */ public interface RMapCacheReactive extends RMapReactive { + /** + * If the specified key is not already associated + * with a value, associate it with the given value. + *

+ * Stores value mapped by key with specified time to live. + * Entry expires after specified time to live. + * If the map previously contained a mapping for + * the key, the old value is replaced by the specified value. + * + * @param key - map key + * @param value - map value + * @param ttl - time to live for key\value entry. + * If 0 then stores infinitely. + * @param unit - time unit + * @return previous associated value + */ Publisher putIfAbsent(K key, V value, long ttl, TimeUnit unit); + /** + * If the specified key is not already associated + * with a value, associate it with the given value. + *

+ * Stores value mapped by key with specified time to live and max idle time. + * Entry expires when specified time to live or max idle time has expired. + *

+ * If the map previously contained a mapping for + * the key, the old value is replaced by the specified value. + * + * @param key - map key + * @param value - map value + * @param ttl - time to live for key\value entry. + * If 0 then time to live doesn't affect entry expiration. + * @param ttlUnit - time unit + * @param maxIdleTime - max idle time for key\value entry. + * If 0 then max idle time doesn't affect entry expiration. + * @param maxIdleUnit - time unit + *

+ * if maxIdleTime and ttl params are equal to 0 + * then entry stores infinitely. + * + * @return previous associated value + */ + Publisher putIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit); + + /** + * Stores value mapped by key with specified time to live. + * Entry expires after specified time to live. + * If the map previously contained a mapping for + * the key, the old value is replaced by the specified value. + * + * @param key - map key + * @param value - map value + * @param ttl - time to live for key\value entry. + * If 0 then stores infinitely. + * @param unit - time unit + * @return previous associated value + */ Publisher put(K key, V value, long ttl, TimeUnit unit); + /** + * Stores value mapped by key with specified time to live and max idle time. + * Entry expires when specified time to live or max idle time has expired. + *

+ * If the map previously contained a mapping for + * the key, the old value is replaced by the specified value. + * + * @param key - map key + * @param value - map value + * @param ttl - time to live for key\value entry. + * If 0 then time to live doesn't affect entry expiration. + * @param ttlUnit - time unit + * @param maxIdleTime - max idle time for key\value entry. + * If 0 then max idle time doesn't affect entry expiration. + * @param maxIdleUnit - time unit + *

+ * if maxIdleTime and ttl params are equal to 0 + * then entry stores infinitely. + * + * @return previous associated value + */ + Publisher put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit); + + /** + * Stores value mapped by key with specified time to live. + * Entry expires after specified time to live. + *

+ * If the map previously contained a mapping for + * the key, the old value is replaced by the specified value. + *

+ * Works faster than usual {@link #put(Object, Object, long, TimeUnit)} + * as it not returns previous value. + * + * @param key - map key + * @param value - map value + * @param ttl - time to live for key\value entry. + * If 0 then stores infinitely. + * @param unit - time unit + * + * @return true if key is a new key in the hash and value was set. + * false if key already exists in the hash and the value was updated. + */ + Publisher fastPut(K key, V value, long ttl, TimeUnit unit); + + /** + * Stores value mapped by key with specified time to live and max idle time. + * Entry expires when specified time to live or max idle time has expired. + *

+ * If the map previously contained a mapping for + * the key, the old value is replaced by the specified value. + *

+ * Works faster than usual {@link #put(Object, Object, long, TimeUnit, long, TimeUnit)} + * as it not returns previous value. + * + * @param key - map key + * @param value - map value + * @param ttl - time to live for key\value entry. + * If 0 then time to live doesn't affect entry expiration. + * @param ttlUnit - time unit + * @param maxIdleTime - max idle time for key\value entry. + * If 0 then max idle time doesn't affect entry expiration. + * @param maxIdleUnit - time unit + *

+ * if maxIdleTime and ttl params are equal to 0 + * then entry stores infinitely. + + * @return true if key is a new key in the hash and value was set. + * false if key already exists in the hash and the value was updated. + */ + Publisher fastPut(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit); + + /** + * If the specified key is not already associated + * with a value, associate it with the given value. + *

+ * Stores value mapped by key with specified time to live and max idle time. + * Entry expires when specified time to live or max idle time has expired. + *

+ * Works faster than usual {@link #putIfAbsent(Object, Object, long, TimeUnit, long, TimeUnit)} + * as it not returns previous value. + * + * @param key - map key + * @param value - map value + * @param ttl - time to live for key\value entry. + * If 0 then time to live doesn't affect entry expiration. + * @param ttlUnit - time unit + * @param maxIdleTime - max idle time for key\value entry. + * If 0 then max idle time doesn't affect entry expiration. + * @param maxIdleUnit - time unit + *

+ * if maxIdleTime and ttl params are equal to 0 + * then entry stores infinitely. + * + * @return true if key is a new key in the hash and value was set. + * false if key already exists in the hash + */ + Publisher fastPutIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit); + /** * Returns the number of entries in cache. * This number can reflects expired entries too diff --git a/redisson/src/main/java/org/redisson/api/RMapReactive.java b/redisson/src/main/java/org/redisson/api/RMapReactive.java index f70104e85..2fd545104 100644 --- a/redisson/src/main/java/org/redisson/api/RMapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapReactive.java @@ -15,10 +15,14 @@ */ package org.redisson.api; +import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.Map.Entry; import org.reactivestreams.Publisher; +import org.redisson.api.map.MapLoader; +import org.redisson.api.map.MapWriter; /** * map functions @@ -30,8 +34,56 @@ import org.reactivestreams.Publisher; */ public interface RMapReactive extends RExpirableReactive { + /** + * Loads all map entries to this Redis map. + * + * @param replaceExistingValues - true if existed values should be replaced, false otherwise. + * @param parallelism - parallelism level, used to increase speed of process execution + * @return void + */ + Publisher loadAll(boolean replaceExistingValues, int parallelism); + + /** + * Loads map entries whose keys are listed in defined keys parameter. + * + * @param keys - map keys + * @param replaceExistingValues - true if existed values should be replaced, false otherwise. + * @param parallelism - parallelism level, used to increase speed of process execution + * @return void + */ + Publisher loadAll(Set keys, boolean replaceExistingValues, int parallelism); + + /** + * Returns size of value mapped by key in bytes + * + * @param key - map key + * @return size of value + */ + Publisher valueSize(K key); + + /** + * Gets a map slice contained the mappings with defined keys + * by one operation. + *

+ * If map doesn't contain value/values for specified key/keys and {@link MapLoader} is defined + * then value/values will be loaded in read-through mode. + *

+ * The returned map is NOT backed by the original map. + * + * @param keys - map keys + * @return Map slice + */ Publisher> getAll(Set keys); + /** + * Associates the specified value with the specified key + * in batch. + *

+ * If {@link MapWriter} is defined then new map entries are stored in write-through mode. + * + * @param map mappings to be stored in this map + * @return void + */ Publisher putAll(Map map); Publisher addAndGet(K key, Number value); @@ -43,10 +95,12 @@ public interface RMapReactive extends RExpirableReactive { Publisher size(); /** - * Removes keys from map by one operation in manner - * - * Works faster than RMap.remove but not returning - * the value associated with key + * Removes keys from map by one operation in async manner. + *

+ * Works faster than {@link #remove(Object, Object)} but doesn't return + * the value associated with key. + *

+ * If {@link MapWriter} is defined then keysare deleted in write-through mode. * * @param keys - map keys * @return the number of keys that were removed from the hash, not including specified but non existing keys @@ -55,30 +109,147 @@ public interface RMapReactive extends RExpirableReactive { /** * Associates the specified value with the specified key - * in manner. - * - * Works faster than RMap.put but not returning + * in async manner. + *

+ * Works faster than {@link #put(Object, Object)} but not returning * the previous value associated with key + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. * * @param key - map key * @param value - map value - * @return true if key is a new key in the hash and value was set. + * @return true if key is a new one in the hash and value was set. * false if key already exists in the hash and the value was updated. */ Publisher fastPut(K key, V value); + /** + * Associates the specified value with the specified key + * only if there is no any association with specifiedkey. + *

+ * Works faster than {@link #putIfAbsent(Object, Object)} but not returning + * the previous value associated with key + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return true if key is a new one in the hash and value was set. + * false if key already exists in the hash and change hasn't been made. + */ + Publisher fastPutIfAbsent(K key, V value); + + /** + * Read all keys at once + * + * @return keys + */ + Publisher> readAllKeySet(); + + /** + * Read all values at once + * + * @return values + */ + Publisher> readAllValues(); + + /** + * Read all map entries at once + * + * @return entries + */ + Publisher>> readAllEntrySet(); + + /** + * Read all map as local instance at once + * + * @return map + */ + Publisher> readAllMap(); + + /** + * Returns the value to which the specified key is mapped, + * or {@code null} if this map contains no mapping for the key. + *

+ * If map doesn't contain value for specified key and {@link MapLoader} is defined + * then value will be loaded in read-through mode. + * + * @param key the key whose associated value is to be returned + * @return the value to which the specified key is mapped, or + * {@code null} if this map contains no mapping for the key + */ Publisher get(K key); + /** + * Associates the specified value with the specified key + * in async manner. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return previous associated value + */ Publisher put(K key, V value); + /** + * Removes key from map and returns associated value in async manner. + *

+ * If {@link MapWriter} is defined then keyis deleted in write-through mode. + * + * @param key - map key + * @return deleted value or null if there wasn't any association + */ Publisher remove(K key); + /** + * Replaces previous value with a new value associated with the key. + * If there wasn't any association before then method returns null. + *

+ * If {@link MapWriter} is defined then new valueis written in write-through mode. + * + * @param key - map key + * @param value - map value + * @return previous associated value + * or null if there wasn't any association and change hasn't been made + */ Publisher replace(K key, V value); + /** + * Replaces previous oldValue with a newValue associated with the key. + * If previous value doesn't exist or equal to oldValue then method returns false. + *

+ * If {@link MapWriter} is defined then newValueis written in write-through mode. + * + * @param key - map key + * @param oldValue - map old value + * @param newValue - map new value + * @return true if value has been replaced otherwise false. + */ Publisher replace(K key, V oldValue, V newValue); + /** + * Removes key from map only if it associated with value. + *

+ * If {@link MapWriter} is defined then keyis deleted in write-through mode. + * + * @param key - map key + * @param value - map value + * @return true if map entry has been replaced otherwise false. + */ Publisher remove(Object key, Object value); + /** + * Associates the specified value with the specified key + * only if there is no any association with specifiedkey. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return null if key is a new one in the hash and value was set. + * Previous value if key already exists in the hash and change hasn't been made. + */ Publisher putIfAbsent(K key, V value); Publisher> entryIterator(); diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index f8f8abffc..67f0eff3e 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -58,9 +58,9 @@ public interface RedissonReactiveClient { /** * Returns map-based cache instance by name * using provided codec for both cache keys and values. - * Supports entry eviction with a given TTL value. - * - *

If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.

+ * Supports entry eviction with a given MaxIdleTime and TTL settings. + *

+ * If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}. * * @param type of keys * @param type of values @@ -70,11 +70,27 @@ public interface RedissonReactiveClient { */ RMapCacheReactive getMapCache(String name, Codec codec); + /** + * Returns map-based cache instance by name + * using provided codec for both cache keys and values. + * Supports entry eviction with a given MaxIdleTime and TTL settings. + *

+ * If eviction is not required then it's better to use regular map {@link #getMap(String, Codec, MapOptions)}. + * + * @param type of key + * @param type of value + * @param name - object name + * @param codec - codec for keys and values + * @param options - map options + * @return MapCache object + */ + RMapCacheReactive getMapCache(String name, Codec codec, MapOptions options); + /** * Returns map-based cache instance by name. - * Supports entry eviction with a given TTL value. - * - *

If eviction is not required then it's better to use regular map {@link #getMap(String)}.

+ * Supports entry eviction with a given MaxIdleTime and TTL settings. + *

+ * If eviction is not required then it's better to use regular map {@link #getMap(String)}. * * @param type of keys * @param type of values @@ -83,6 +99,20 @@ public interface RedissonReactiveClient { */ RMapCacheReactive getMapCache(String name); + /** + * Returns map-based cache instance by name. + * Supports entry eviction with a given MaxIdleTime and TTL settings. + *

+ * If eviction is not required then it's better to use regular map {@link #getMap(String, MapOptions)}.

+ * + * @param type of key + * @param type of value + * @param name - name of object + * @param options - map options + * @return MapCache object + */ + RMapCacheReactive getMapCache(String name, MapOptions options); + /** * Returns object holder instance by name * @@ -162,6 +192,17 @@ public interface RedissonReactiveClient { */ RMapReactive getMap(String name); + /** + * Returns map instance by name. + * + * @param type of key + * @param type of value + * @param name - name of object + * @param options - map options + * @return Map object + */ + RMapReactive getMap(String name, MapOptions options); + /** * Returns map instance by name * using provided codec for both map keys and values. @@ -174,6 +215,19 @@ public interface RedissonReactiveClient { */ RMapReactive getMap(String name, Codec codec); + /** + * Returns map instance by name + * using provided codec for both map keys and values. + * + * @param type of key + * @param type of value + * @param name - name of object + * @param codec - codec for keys and values + * @param options - map options + * @return Map object + */ + RMapReactive getMap(String name, Codec codec, MapOptions options); + /** * Returns set instance by name. * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 74eca37b4..5fdd72f59 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -87,22 +87,22 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public RMapReactive getMap(String name) { - return new RedissonMapReactive(executorService, name); + return new RedissonMapReactive(executorService, name, null); } @Override public RMapReactive getMap(String name, Codec codec) { - return new RedissonMapReactive(codec, executorService, name); + return new RedissonMapReactive(codec, executorService, name, null); } @Override public RMapCacheReactive getMapCache(String name, Codec codec) { - return new RedissonMapCacheReactive(id, evictionScheduler, codec, executorService, name); + return new RedissonMapCacheReactive(evictionScheduler, codec, executorService, name, null); } @Override public RMapCacheReactive getMapCache(String name) { - return new RedissonMapCacheReactive(id, evictionScheduler, executorService, name); + return new RedissonMapCacheReactive(evictionScheduler, executorService, name, null); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 2e710fa7c..c86107d1f 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -16,24 +16,20 @@ package org.redisson.reactive; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonMapCache; -import org.redisson.api.RMapCache; +import org.redisson.api.MapOptions; +import org.redisson.api.RMapCacheAsync; import org.redisson.api.RMapCacheReactive; import org.redisson.api.RMapReactive; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; -import org.redisson.client.protocol.decoder.NestedMultiDecoder; -import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; import org.redisson.eviction.EvictionScheduler; @@ -63,16 +59,16 @@ import reactor.rx.Streams; */ public class RedissonMapCacheReactive extends RedissonExpirableReactive implements RMapCacheReactive, MapReactive { - private final RMapCache mapCache; + private final RMapCacheAsync mapCache; - public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { + public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name, MapOptions options) { super(commandExecutor, name); - this.mapCache = new RedissonMapCache(evictionScheduler, commandExecutor, name, null, null, null); + this.mapCache = new RedissonMapCache(evictionScheduler, commandExecutor, name, null, options); } - public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name) { + public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options) { super(codec, commandExecutor, name); - this.mapCache = new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null, null, null); + this.mapCache = new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null, options); } @Override @@ -110,10 +106,6 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im return reactive(mapCache.putAsync(key, value, ttl, unit)); } - String getTimeoutSetName() { - return "redisson__timeout__set__{" + getName() + "}"; - } - @Override public Publisher remove(K key) { return reactive(mapCache.removeAsync(key)); @@ -300,4 +292,72 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im }).next().poll(); } + @Override + public Publisher loadAll(boolean replaceExistingValues, int parallelism) { + return reactive(mapCache.loadAllAsync(replaceExistingValues, parallelism)); + } + + @Override + public Publisher loadAll(Set keys, boolean replaceExistingValues, int parallelism) { + return reactive(mapCache.loadAllAsync(keys, replaceExistingValues, parallelism)); + } + + @Override + public Publisher valueSize(K key) { + return reactive(mapCache.valueSizeAsync(key)); + } + + @Override + public Publisher fastPutIfAbsent(K key, V value) { + return reactive(mapCache.fastPutIfAbsentAsync(key, value)); + } + + @Override + public Publisher> readAllKeySet() { + return reactive(mapCache.readAllKeySetAsync()); + } + + @Override + public Publisher> readAllValues() { + return reactive(mapCache.readAllValuesAsync()); + } + + @Override + public Publisher>> readAllEntrySet() { + return reactive(mapCache.readAllEntrySetAsync()); + } + + @Override + public Publisher> readAllMap() { + return reactive(mapCache.readAllMapAsync()); + } + + @Override + public Publisher putIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, + TimeUnit maxIdleUnit) { + return reactive(mapCache.putIfAbsentAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit)); + } + + @Override + public Publisher put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { + return reactive(mapCache.putAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit)); + } + + @Override + public Publisher fastPut(K key, V value, long ttl, TimeUnit unit) { + return reactive(mapCache.fastPutAsync(key, value, ttl, unit)); + } + + @Override + public Publisher fastPut(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, + TimeUnit maxIdleUnit) { + return reactive(mapCache.fastPutAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit)); + } + + @Override + public Publisher fastPutIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, + TimeUnit maxIdleUnit) { + return reactive(mapCache.fastPutIfAbsentAsync(key, value, ttl, ttlUnit, maxIdleTime, maxIdleUnit)); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index 18f9d2efa..e73ff60b9 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -16,12 +16,15 @@ package org.redisson.reactive; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.reactivestreams.Publisher; import org.redisson.RedissonMap; +import org.redisson.api.MapOptions; +import org.redisson.api.RMapAsync; import org.redisson.api.RMapReactive; import org.redisson.client.codec.Codec; import org.redisson.client.codec.MapScanCodec; @@ -46,16 +49,56 @@ import reactor.rx.Streams; */ public class RedissonMapReactive extends RedissonExpirableReactive implements RMapReactive, MapReactive { - private final RedissonMap instance; + private final RMapAsync instance; - public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) { + public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name, MapOptions options) { super(commandExecutor, name); - instance = new RedissonMap(codec, commandExecutor, name, null, null, null); + instance = new RedissonMap(codec, commandExecutor, name, null, options); } - public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, MapOptions options) { super(codec, commandExecutor, name); - instance = new RedissonMap(codec, commandExecutor, name, null, null, null); + instance = new RedissonMap(codec, commandExecutor, name, null, options); + } + + @Override + public Publisher loadAll(boolean replaceExistingValues, int parallelism) { + return reactive(instance.loadAllAsync(replaceExistingValues, parallelism)); + } + + @Override + public Publisher loadAll(Set keys, boolean replaceExistingValues, int parallelism) { + return reactive(instance.loadAllAsync(keys, replaceExistingValues, parallelism)); + } + + @Override + public Publisher fastPutIfAbsent(K key, V value) { + return reactive(instance.fastPutIfAbsentAsync(key, value)); + } + + @Override + public Publisher> readAllKeySet() { + return reactive(instance.readAllKeySetAsync()); + } + + @Override + public Publisher> readAllValues() { + return reactive(instance.readAllValuesAsync()); + } + + @Override + public Publisher>> readAllEntrySet() { + return reactive(instance.readAllEntrySetAsync()); + } + + @Override + public Publisher> readAllMap() { + return reactive(instance.readAllMapAsync()); + } + + @Override + public Publisher valueSize(K key) { + return reactive(instance.valueSizeAsync(key)); } @Override