Feature - merge(), compute(), computeIfAbsent(), computeIfPresent() methods implemented for RMap-based objects. #3064

pull/3083/head
Nikita Koksharov 4 years ago
parent 287dd1dc41
commit f0c7610e76

@ -16,19 +16,9 @@
package org.redisson;
import java.math.BigDecimal;
import java.util.AbstractCollection;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.redisson.api.MapOptions;
@ -152,6 +142,365 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(sizeAsync());
}
@Override
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
checkKey(key);
checkValue(value);
Objects.requireNonNull(remappingFunction);
RLock lock = getLock(key);
lock.lock();
try {
V oldValue = get(key);
V newValue = value;
if (oldValue != null) {
newValue = remappingFunction.apply(oldValue, value);
}
if(newValue == null) {
fastRemove(key);
} else {
fastPut(key, newValue);
}
return newValue;
} finally {
lock.unlock();
}
}
@Override
public RFuture<V> mergeAsync(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
checkKey(key);
checkValue(value);
Objects.requireNonNull(remappingFunction);
RLock lock = getLock(key);
RPromise<V> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
lock.lockAsync(threadId).onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
RFuture<V> oldValueFuture = getAsync(key);
oldValueFuture.onComplete((oldValue, ex) -> {
if (ex != null) {
lock.unlockAsync(threadId);
result.tryFailure(ex);
return;
}
RPromise<V> newValuePromise = new RedissonPromise<>();
if (oldValue != null) {
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = remappingFunction.apply(oldValue, value);
} catch (Exception exception) {
lock.unlockAsync(threadId);
result.tryFailure(exception);
return;
}
newValuePromise.trySuccess(newValue);
});
} else {
newValuePromise.trySuccess(value);
}
newValuePromise.onComplete((newValue, ee) -> {
RFuture<?> future;
if (newValue != null) {
future = fastPutAsync(key, newValue);
} else {
future = fastRemoveAsync(key);
}
future.onComplete((res, exc) -> {
lock.unlockAsync(threadId);
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(newValue);
});
});
});
});
return result;
}
@Override
public RFuture<V> computeAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
checkKey(key);
Objects.requireNonNull(remappingFunction);
RLock lock = getLock(key);
RPromise<V> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
lock.lockAsync(threadId).onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
RFuture<V> oldValueFuture = getAsync(key);
oldValueFuture.onComplete((oldValue, ex) -> {
if (ex != null) {
lock.unlockAsync(threadId);
result.tryFailure(ex);
return;
}
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = remappingFunction.apply(key, oldValue);
} catch (Exception exception) {
lock.unlockAsync(threadId);
result.tryFailure(exception);
return;
}
if (newValue == null) {
if (oldValue != null) {
fastRemoveAsync(key).onComplete((res, exc) -> {
lock.unlockAsync(threadId);
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(newValue);
});
return;
}
} else {
fastPutAsync(key, newValue).onComplete((res, exc) -> {
lock.unlockAsync(threadId);
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(newValue);
});
return;
}
lock.unlockAsync(threadId);
result.trySuccess(newValue);
});
});
});
return result;
}
@Override
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
checkKey(key);
Objects.requireNonNull(remappingFunction);
RLock lock = getLock(key);
lock.lock();
try {
V oldValue = get(key);
V newValue = remappingFunction.apply(key, oldValue);
if (newValue == null) {
if (oldValue != null) {
fastRemove(key);
}
} else {
fastPut(key, newValue);
}
return newValue;
} finally {
lock.unlock();
}
}
@Override
public RFuture<V> computeIfAbsentAsync(K key, Function<? super K, ? extends V> mappingFunction) {
checkKey(key);
Objects.requireNonNull(mappingFunction);
RLock lock = getLock(key);
RPromise<V> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
lock.lockAsync(threadId).onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
RFuture<V> oldValueFuture = getAsync(key);
oldValueFuture.onComplete((oldValue, ex) -> {
if (ex != null) {
lock.unlockAsync(threadId);
result.tryFailure(ex);
return;
}
if (oldValue != null) {
lock.unlockAsync(threadId);
result.trySuccess(oldValue);
return;
}
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = mappingFunction.apply(key);
} catch (Exception exception) {
lock.unlockAsync(threadId);
result.tryFailure(exception);
return;
}
if (newValue != null) {
fastPutAsync(key, newValue).onComplete((res, exc) -> {
lock.unlockAsync(threadId);
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(newValue);
});
return;
}
lock.unlockAsync(threadId);
result.trySuccess(null);
});
});
});
return result;
}
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
checkKey(key);
Objects.requireNonNull(mappingFunction);
RLock lock = getLock(key);
lock.lock();
try {
V value = get(key);
if (value == null) {
V newValue = mappingFunction.apply(key);
if (newValue != null) {
fastPut(key, newValue);
return newValue;
}
return null;
}
return value;
} finally {
lock.unlock();
}
}
@Override
public RFuture<V> computeIfPresentAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
checkKey(key);
Objects.requireNonNull(remappingFunction);
RLock lock = getLock(key);
RPromise<V> result = new RedissonPromise<>();
long threadId = Thread.currentThread().getId();
lock.lockAsync(threadId).onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
RFuture<V> oldValueFuture = getAsync(key);
oldValueFuture.onComplete((oldValue, ex) -> {
if (ex != null) {
lock.unlockAsync(threadId);
result.tryFailure(e);
return;
}
if (oldValue == null) {
lock.unlockAsync(threadId);
result.trySuccess(null);
return;
}
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
V newValue;
try {
newValue = remappingFunction.apply(key, oldValue);
} catch (Exception exception) {
lock.unlockAsync(threadId);
result.tryFailure(exception);
return;
}
if (newValue != null) {
RFuture<Boolean> replaceFuture = replaceAsync(key, oldValue, newValue);
replaceFuture.onComplete((re, ex1) -> {
lock.unlockAsync(threadId);
if (ex1 != null) {
result.tryFailure(ex1);
return;
}
if (re) {
result.trySuccess(newValue);
} else {
result.trySuccess(oldValue);
}
});
} else if (remove(key, oldValue)) {
RFuture<Boolean> removeFuture = removeAsync(key, oldValue);
removeFuture.onComplete((re, ex1) -> {
lock.unlockAsync(threadId);
if (ex1 != null) {
result.tryFailure(ex1);
return;
}
if (re) {
result.trySuccess(null);
} else {
result.trySuccess(oldValue);
}
});
}
});
});
});
return result;
}
@Override
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
checkKey(key);
Objects.requireNonNull(remappingFunction);
RLock lock = getLock(key);
lock.lock();
try {
V oldValue = get(key);
if (oldValue == null) {
return null;
}
V newValue = remappingFunction.apply(key, oldValue);
if (newValue != null) {
if (replace(key, oldValue, newValue)) {
return newValue;
}
} else if (remove(key, oldValue)) {
return null;
}
return oldValue;
} finally {
lock.unlock();
}
}
@Override
public RFuture<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName());

@ -22,6 +22,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* Async interface for Redis based implementation
@ -37,6 +39,49 @@ import java.util.Set;
*/
public interface RMapAsync<K, V> extends RExpirableAsync {
/**
* Associates specified key with the given value if key isn't already associated with a value.
* Otherwise, replaces the associated value with the results of the given
* remapping function, or removes if the result is {@code null}.
*
* @param key - map key
* @param value - value to be merged with the existing value
* associated with the key or to be associated with the key,
* if no existing value
* @param remappingFunction - the function is invoked with the existing value to compute new value
* @return new value associated with the specified key or
* {@code null} if no value associated with the key
*/
RFuture<V> mergeAsync(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction);
/**
* Computes a new mapping for the specified key and its current mapped value.
*
* @param key - map key
* @param remappingFunction - function to compute a value
* @return the new value associated with the specified key, or {@code null} if none
*/
RFuture<V> computeAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction);
/**
* Computes a mapping for the specified key if it's not mapped before.
*
* @param key - map key
* @param mappingFunction - function to compute a value
* @return current or new computed value associated with
* the specified key, or {@code null} if the computed value is null
*/
RFuture<V> computeIfAbsentAsync(K key, Function<? super K, ? extends V> mappingFunction);
/**
* Computes a mapping for the specified key only if it's already mapped.
*
* @param key - map key
* @param remappingFunction - function to compute a value
* @return the new value associated with the specified key, or null if none
*/
RFuture<V> computeIfPresentAsync(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction);
/**
* Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}.
*

@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* Reactive interface for Redis based implementation
@ -39,6 +41,49 @@ import java.util.Set;
*/
public interface RMapReactive<K, V> extends RExpirableReactive {
/**
* Associates specified key with the given value if key isn't already associated with a value.
* Otherwise, replaces the associated value with the results of the given
* remapping function, or removes if the result is {@code null}.
*
* @param key - map key
* @param value - value to be merged with the existing value
* associated with the key or to be associated with the key,
* if no existing value
* @param remappingFunction - the function is invoked with the existing value to compute new value
* @return new value associated with the specified key or
* {@code null} if no value associated with the key
*/
Mono<V> merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction);
/**
* Computes a new mapping for the specified key and its current mapped value.
*
* @param key - map key
* @param remappingFunction - function to compute a value
* @return the new value associated with the specified key, or {@code null} if none
*/
Mono<V> compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction);
/**
* Computes a mapping for the specified key if it's not mapped before.
*
* @param key - map key
* @param mappingFunction - function to compute a value
* @return current or new computed value associated with
* the specified key, or {@code null} if the computed value is null
*/
Mono<V> computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction);
/**
* Computes a mapping for the specified key only if it's already mapped.
*
* @param key - map key
* @param remappingFunction - function to compute a value
* @return the new value associated with the specified key, or null if none
*/
Mono<V> computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction);
/**
* Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}.
*

@ -19,6 +19,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
@ -42,6 +44,49 @@ import io.reactivex.Single;
*/
public interface RMapRx<K, V> extends RExpirableRx {
/**
* Associates specified key with the given value if key isn't already associated with a value.
* Otherwise, replaces the associated value with the results of the given
* remapping function, or removes if the result is {@code null}.
*
* @param key - map key
* @param value - value to be merged with the existing value
* associated with the key or to be associated with the key,
* if no existing value
* @param remappingFunction - the function is invoked with the existing value to compute new value
* @return new value associated with the specified key or
* {@code null} if no value associated with the key
*/
Maybe<V> merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction);
/**
* Computes a new mapping for the specified key and its current mapped value.
*
* @param key - map key
* @param remappingFunction - function to compute a value
* @return the new value associated with the specified key, or {@code null} if none
*/
Maybe<V> compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction);
/**
* Computes a mapping for the specified key if it's not mapped before.
*
* @param key - map key
* @param mappingFunction - function to compute a value
* @return current or new computed value associated with
* the specified key, or {@code null} if the computed value is null
*/
Maybe<V> computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction);
/**
* Computes a mapping for the specified key only if it's already mapped.
*
* @param key - map key
* @param remappingFunction - function to compute a value
* @return the new value associated with the specified key, or null if none
*/
Maybe<V> computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction);
/**
* Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}.
*

@ -143,7 +143,72 @@ public abstract class BaseMapTest extends BaseTest {
((RDestroyable) map).destroy();
}
}
@Test
public void testComputeIfPresent() {
RMap<String, String> map = getMap("map");
map.computeIfPresent("1", (key, value) -> {
return "12";
});
assertThat(map.get("1")).isNull();
map.put("1", "10");
map.computeIfPresent("1", (key, value) -> {
assertThat(value).isEqualTo("10");
return "12";
});
assertThat(map.get("1")).isEqualTo("12");
}
@Test
public void testComputeIfAbsent() {
RMap<String, String> map = getMap("map");
map.computeIfAbsent("1", (key) -> {
assertThat(key).isEqualTo("1");
return "12";
});
assertThat(map.get("1")).isEqualTo("12");
map.computeIfAbsent("1", (key) -> {
assertThat(key).isEqualTo("1");
return "13";
});
assertThat(map.get("1")).isEqualTo("12");
}
@Test
public void testMerge() {
RMap<String, String> map = getMap("map");
map.merge("1", "2", (key, oldValue) -> {
return "12";
});
assertThat(map.get("1")).isEqualTo("2");
map.merge("1", "2", (key, oldValue) -> {
return "12";
});
assertThat(map.get("1")).isEqualTo("12");
map.merge("1", "2", (key, oldValue) -> {
return null;
});
assertThat(map.get("1")).isNull();
}
@Test
public void testCompute() {
RMap<String, String> map = getMap("map");
map.compute("1", (key, oldValue) -> {
return "12";
});
assertThat(map.get("1")).isEqualTo("12");
map.compute("1", (key, oldValue) -> {
return null;
});
assertThat(map.get("1")).isNull();
}
@Test
public void testGetAllWithStringKeys() {
RMap<String, Integer> map = getMap("getAllStrings");

Loading…
Cancel
Save