From f0c7610e764f5888d16c41753c441e28a8a45203 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 21 Sep 2020 08:50:01 +0300 Subject: [PATCH] Feature - merge(), compute(), computeIfAbsent(), computeIfPresent() methods implemented for RMap-based objects. #3064 --- .../main/java/org/redisson/RedissonMap.java | 373 +++++++++++++++++- .../main/java/org/redisson/api/RMapAsync.java | 45 +++ .../java/org/redisson/api/RMapReactive.java | 45 +++ .../main/java/org/redisson/api/RMapRx.java | 45 +++ .../test/java/org/redisson/BaseMapTest.java | 67 +++- 5 files changed, 562 insertions(+), 13 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 53f4f0e5f..b83d97347 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -16,19 +16,9 @@ package org.redisson; import java.math.BigDecimal; -import java.util.AbstractCollection; -import java.util.AbstractSet; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.Function; import org.redisson.api.MapOptions; @@ -152,6 +142,365 @@ public class RedissonMap extends RedissonExpirable implements RMap { return get(sizeAsync()); } + @Override + public V merge(K key, V value, BiFunction remappingFunction) { + checkKey(key); + checkValue(value); + Objects.requireNonNull(remappingFunction); + + RLock lock = getLock(key); + lock.lock(); + try { + V oldValue = get(key); + V newValue = value; + if (oldValue != null) { + newValue = remappingFunction.apply(oldValue, value); + } + + if(newValue == null) { + fastRemove(key); + } else { + fastPut(key, newValue); + } + return newValue; + } finally { + lock.unlock(); + } + } + + @Override + public RFuture mergeAsync(K key, V value, BiFunction remappingFunction) { + checkKey(key); + checkValue(value); + Objects.requireNonNull(remappingFunction); + + RLock lock = getLock(key); + RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); + lock.lockAsync(threadId).onComplete((r, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + RFuture oldValueFuture = getAsync(key); + oldValueFuture.onComplete((oldValue, ex) -> { + if (ex != null) { + lock.unlockAsync(threadId); + result.tryFailure(ex); + return; + } + + RPromise newValuePromise = new RedissonPromise<>(); + if (oldValue != null) { + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + V newValue; + try { + newValue = remappingFunction.apply(oldValue, value); + } catch (Exception exception) { + lock.unlockAsync(threadId); + result.tryFailure(exception); + return; + } + newValuePromise.trySuccess(newValue); + }); + } else { + newValuePromise.trySuccess(value); + } + newValuePromise.onComplete((newValue, ee) -> { + RFuture future; + if (newValue != null) { + future = fastPutAsync(key, newValue); + } else { + future = fastRemoveAsync(key); + } + future.onComplete((res, exc) -> { + lock.unlockAsync(threadId); + if (exc != null) { + result.tryFailure(exc); + return; + } + + result.trySuccess(newValue); + }); + }); + }); + }); + return result; + } + + @Override + public RFuture computeAsync(K key, BiFunction remappingFunction) { + checkKey(key); + Objects.requireNonNull(remappingFunction); + + RLock lock = getLock(key); + RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); + lock.lockAsync(threadId).onComplete((r, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + RFuture oldValueFuture = getAsync(key); + oldValueFuture.onComplete((oldValue, ex) -> { + if (ex != null) { + lock.unlockAsync(threadId); + result.tryFailure(ex); + return; + } + + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + V newValue; + try { + newValue = remappingFunction.apply(key, oldValue); + } catch (Exception exception) { + lock.unlockAsync(threadId); + result.tryFailure(exception); + return; + } + + if (newValue == null) { + if (oldValue != null) { + fastRemoveAsync(key).onComplete((res, exc) -> { + lock.unlockAsync(threadId); + if (exc != null) { + result.tryFailure(exc); + return; + } + + result.trySuccess(newValue); + }); + return; + } + } else { + fastPutAsync(key, newValue).onComplete((res, exc) -> { + lock.unlockAsync(threadId); + if (exc != null) { + result.tryFailure(exc); + return; + } + + result.trySuccess(newValue); + }); + return; + } + + lock.unlockAsync(threadId); + result.trySuccess(newValue); + }); + }); + }); + return result; + + } + + @Override + public V compute(K key, BiFunction remappingFunction) { + checkKey(key); + Objects.requireNonNull(remappingFunction); + + RLock lock = getLock(key); + lock.lock(); + try { + V oldValue = get(key); + + V newValue = remappingFunction.apply(key, oldValue); + if (newValue == null) { + if (oldValue != null) { + fastRemove(key); + } + } else { + fastPut(key, newValue); + } + return newValue; + } finally { + lock.unlock(); + } + } + + @Override + public RFuture computeIfAbsentAsync(K key, Function mappingFunction) { + checkKey(key); + Objects.requireNonNull(mappingFunction); + + RLock lock = getLock(key); + RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); + lock.lockAsync(threadId).onComplete((r, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + RFuture oldValueFuture = getAsync(key); + oldValueFuture.onComplete((oldValue, ex) -> { + if (ex != null) { + lock.unlockAsync(threadId); + result.tryFailure(ex); + return; + } + + if (oldValue != null) { + lock.unlockAsync(threadId); + result.trySuccess(oldValue); + return; + } + + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + V newValue; + try { + newValue = mappingFunction.apply(key); + } catch (Exception exception) { + lock.unlockAsync(threadId); + result.tryFailure(exception); + return; + } + if (newValue != null) { + fastPutAsync(key, newValue).onComplete((res, exc) -> { + lock.unlockAsync(threadId); + if (exc != null) { + result.tryFailure(exc); + return; + } + + result.trySuccess(newValue); + }); + return; + } + + lock.unlockAsync(threadId); + result.trySuccess(null); + }); + }); + }); + return result; + } + + @Override + public V computeIfAbsent(K key, Function mappingFunction) { + checkKey(key); + Objects.requireNonNull(mappingFunction); + + RLock lock = getLock(key); + lock.lock(); + try { + V value = get(key); + if (value == null) { + V newValue = mappingFunction.apply(key); + if (newValue != null) { + fastPut(key, newValue); + return newValue; + } + return null; + } + return value; + } finally { + lock.unlock(); + } + } + + @Override + public RFuture computeIfPresentAsync(K key, BiFunction remappingFunction) { + checkKey(key); + Objects.requireNonNull(remappingFunction); + + RLock lock = getLock(key); + RPromise result = new RedissonPromise<>(); + long threadId = Thread.currentThread().getId(); + lock.lockAsync(threadId).onComplete((r, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + RFuture oldValueFuture = getAsync(key); + oldValueFuture.onComplete((oldValue, ex) -> { + if (ex != null) { + lock.unlockAsync(threadId); + result.tryFailure(e); + return; + } + + if (oldValue == null) { + lock.unlockAsync(threadId); + result.trySuccess(null); + return; + } + + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + V newValue; + try { + newValue = remappingFunction.apply(key, oldValue); + } catch (Exception exception) { + lock.unlockAsync(threadId); + result.tryFailure(exception); + return; + } + if (newValue != null) { + RFuture replaceFuture = replaceAsync(key, oldValue, newValue); + replaceFuture.onComplete((re, ex1) -> { + lock.unlockAsync(threadId); + if (ex1 != null) { + result.tryFailure(ex1); + return; + } + + if (re) { + result.trySuccess(newValue); + } else { + result.trySuccess(oldValue); + } + }); + } else if (remove(key, oldValue)) { + RFuture removeFuture = removeAsync(key, oldValue); + removeFuture.onComplete((re, ex1) -> { + lock.unlockAsync(threadId); + if (ex1 != null) { + result.tryFailure(ex1); + return; + } + + if (re) { + result.trySuccess(null); + } else { + result.trySuccess(oldValue); + } + }); + } + }); + }); + }); + return result; + } + + @Override + public V computeIfPresent(K key, BiFunction remappingFunction) { + checkKey(key); + Objects.requireNonNull(remappingFunction); + + RLock lock = getLock(key); + lock.lock(); + try { + V oldValue = get(key); + if (oldValue == null) { + return null; + } + + V newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + if (replace(key, oldValue, newValue)) { + return newValue; + } + } else if (remove(key, oldValue)) { + return null; + } + return oldValue; + } finally { + lock.unlock(); + } + } + @Override public RFuture sizeAsync() { return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName()); diff --git a/redisson/src/main/java/org/redisson/api/RMapAsync.java b/redisson/src/main/java/org/redisson/api/RMapAsync.java index 5986c2d21..ff6c99c0d 100644 --- a/redisson/src/main/java/org/redisson/api/RMapAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapAsync.java @@ -22,6 +22,8 @@ import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; /** * Async interface for Redis based implementation @@ -37,6 +39,49 @@ import java.util.Set; */ public interface RMapAsync extends RExpirableAsync { + /** + * Associates specified key with the given value if key isn't already associated with a value. + * Otherwise, replaces the associated value with the results of the given + * remapping function, or removes if the result is {@code null}. + * + * @param key - map key + * @param value - value to be merged with the existing value + * associated with the key or to be associated with the key, + * if no existing value + * @param remappingFunction - the function is invoked with the existing value to compute new value + * @return new value associated with the specified key or + * {@code null} if no value associated with the key + */ + RFuture mergeAsync(K key, V value, BiFunction remappingFunction); + + /** + * Computes a new mapping for the specified key and its current mapped value. + * + * @param key - map key + * @param remappingFunction - function to compute a value + * @return the new value associated with the specified key, or {@code null} if none + */ + RFuture computeAsync(K key, BiFunction remappingFunction); + + /** + * Computes a mapping for the specified key if it's not mapped before. + * + * @param key - map key + * @param mappingFunction - function to compute a value + * @return current or new computed value associated with + * the specified key, or {@code null} if the computed value is null + */ + RFuture computeIfAbsentAsync(K key, Function mappingFunction); + + /** + * Computes a mapping for the specified key only if it's already mapped. + * + * @param key - map key + * @param remappingFunction - function to compute a value + * @return the new value associated with the specified key, or null if none + */ + RFuture computeIfPresentAsync(K key, BiFunction remappingFunction); + /** * Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}. * diff --git a/redisson/src/main/java/org/redisson/api/RMapReactive.java b/redisson/src/main/java/org/redisson/api/RMapReactive.java index 6a4f92160..dfbca886a 100644 --- a/redisson/src/main/java/org/redisson/api/RMapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapReactive.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; /** * Reactive interface for Redis based implementation @@ -39,6 +41,49 @@ import java.util.Set; */ public interface RMapReactive extends RExpirableReactive { + /** + * Associates specified key with the given value if key isn't already associated with a value. + * Otherwise, replaces the associated value with the results of the given + * remapping function, or removes if the result is {@code null}. + * + * @param key - map key + * @param value - value to be merged with the existing value + * associated with the key or to be associated with the key, + * if no existing value + * @param remappingFunction - the function is invoked with the existing value to compute new value + * @return new value associated with the specified key or + * {@code null} if no value associated with the key + */ + Mono merge(K key, V value, BiFunction remappingFunction); + + /** + * Computes a new mapping for the specified key and its current mapped value. + * + * @param key - map key + * @param remappingFunction - function to compute a value + * @return the new value associated with the specified key, or {@code null} if none + */ + Mono compute(K key, BiFunction remappingFunction); + + /** + * Computes a mapping for the specified key if it's not mapped before. + * + * @param key - map key + * @param mappingFunction - function to compute a value + * @return current or new computed value associated with + * the specified key, or {@code null} if the computed value is null + */ + Mono computeIfAbsent(K key, Function mappingFunction); + + /** + * Computes a mapping for the specified key only if it's already mapped. + * + * @param key - map key + * @param remappingFunction - function to compute a value + * @return the new value associated with the specified key, or null if none + */ + Mono computeIfPresent(K key, BiFunction remappingFunction); + /** * Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}. * diff --git a/redisson/src/main/java/org/redisson/api/RMapRx.java b/redisson/src/main/java/org/redisson/api/RMapRx.java index 847f2d822..2c053d928 100644 --- a/redisson/src/main/java/org/redisson/api/RMapRx.java +++ b/redisson/src/main/java/org/redisson/api/RMapRx.java @@ -19,6 +19,8 @@ import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; import org.redisson.api.map.MapLoader; import org.redisson.api.map.MapWriter; @@ -42,6 +44,49 @@ import io.reactivex.Single; */ public interface RMapRx extends RExpirableRx { + /** + * Associates specified key with the given value if key isn't already associated with a value. + * Otherwise, replaces the associated value with the results of the given + * remapping function, or removes if the result is {@code null}. + * + * @param key - map key + * @param value - value to be merged with the existing value + * associated with the key or to be associated with the key, + * if no existing value + * @param remappingFunction - the function is invoked with the existing value to compute new value + * @return new value associated with the specified key or + * {@code null} if no value associated with the key + */ + Maybe merge(K key, V value, BiFunction remappingFunction); + + /** + * Computes a new mapping for the specified key and its current mapped value. + * + * @param key - map key + * @param remappingFunction - function to compute a value + * @return the new value associated with the specified key, or {@code null} if none + */ + Maybe compute(K key, BiFunction remappingFunction); + + /** + * Computes a mapping for the specified key if it's not mapped before. + * + * @param key - map key + * @param mappingFunction - function to compute a value + * @return current or new computed value associated with + * the specified key, or {@code null} if the computed value is null + */ + Maybe computeIfAbsent(K key, Function mappingFunction); + + /** + * Computes a mapping for the specified key only if it's already mapped. + * + * @param key - map key + * @param remappingFunction - function to compute a value + * @return the new value associated with the specified key, or null if none + */ + Maybe computeIfPresent(K key, BiFunction remappingFunction); + /** * Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}. * diff --git a/redisson/src/test/java/org/redisson/BaseMapTest.java b/redisson/src/test/java/org/redisson/BaseMapTest.java index db0ade490..51bb0a96b 100644 --- a/redisson/src/test/java/org/redisson/BaseMapTest.java +++ b/redisson/src/test/java/org/redisson/BaseMapTest.java @@ -143,7 +143,72 @@ public abstract class BaseMapTest extends BaseTest { ((RDestroyable) map).destroy(); } } - + + @Test + public void testComputeIfPresent() { + RMap map = getMap("map"); + map.computeIfPresent("1", (key, value) -> { + return "12"; + }); + assertThat(map.get("1")).isNull(); + + map.put("1", "10"); + map.computeIfPresent("1", (key, value) -> { + assertThat(value).isEqualTo("10"); + return "12"; + }); + assertThat(map.get("1")).isEqualTo("12"); + } + + @Test + public void testComputeIfAbsent() { + RMap map = getMap("map"); + map.computeIfAbsent("1", (key) -> { + assertThat(key).isEqualTo("1"); + return "12"; + }); + assertThat(map.get("1")).isEqualTo("12"); + + map.computeIfAbsent("1", (key) -> { + assertThat(key).isEqualTo("1"); + return "13"; + }); + assertThat(map.get("1")).isEqualTo("12"); + } + + @Test + public void testMerge() { + RMap map = getMap("map"); + map.merge("1", "2", (key, oldValue) -> { + return "12"; + }); + assertThat(map.get("1")).isEqualTo("2"); + + map.merge("1", "2", (key, oldValue) -> { + return "12"; + }); + assertThat(map.get("1")).isEqualTo("12"); + + map.merge("1", "2", (key, oldValue) -> { + return null; + }); + assertThat(map.get("1")).isNull(); + } + + @Test + public void testCompute() { + RMap map = getMap("map"); + map.compute("1", (key, oldValue) -> { + return "12"; + }); + assertThat(map.get("1")).isEqualTo("12"); + + map.compute("1", (key, oldValue) -> { + return null; + }); + assertThat(map.get("1")).isNull(); + } + @Test public void testGetAllWithStringKeys() { RMap map = getMap("getAllStrings");