Merge branch 'master' into feature_json_mget

pull/6051/head
seakider 7 months ago
commit d418fb648e

@ -182,7 +182,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>4.1.111.Final</version>
<version>4.1.112.Final</version>
<type>pom</type>
<scope>import</scope>
</dependency>

@ -15,8 +15,8 @@
*/
package org.redisson;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class BooleanSlotCallback implements SlotCallback<Boolean, Boolean> {
private final AtomicBoolean r = new AtomicBoolean();
private final Object[] params;
public BooleanSlotCallback() {
@ -38,15 +36,8 @@ public class BooleanSlotCallback implements SlotCallback<Boolean, Boolean> {
}
@Override
public void onSlotResult(List<Object> keys, Boolean res) {
if (res) {
r.set(true);
}
}
@Override
public Boolean onFinish() {
return r.get();
public Boolean onResult(Collection<Boolean> res) {
return res.contains(true);
}
@Override

@ -15,8 +15,8 @@
*/
package org.redisson;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class IntegerSlotCallback implements SlotCallback<Integer, Integer> {
private final AtomicInteger results = new AtomicInteger();
private final Object[] params;
public IntegerSlotCallback() {
@ -38,13 +36,8 @@ public class IntegerSlotCallback implements SlotCallback<Integer, Integer> {
}
@Override
public void onSlotResult(List<Object> keys, Integer result) {
results.addAndGet(result);
}
@Override
public Integer onFinish() {
return results.get();
public Integer onResult(Collection<Integer> result) {
return result.stream().mapToInt(r -> r).sum();
}
@Override

@ -15,8 +15,8 @@
*/
package org.redisson;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
*
@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class LongSlotCallback implements SlotCallback<Long, Long> {
private final AtomicLong results = new AtomicLong();
private final Object[] params;
public LongSlotCallback() {
@ -38,13 +36,8 @@ public class LongSlotCallback implements SlotCallback<Long, Long> {
}
@Override
public void onSlotResult(List<Object> keys, Long result) {
results.addAndGet(result);
}
@Override
public Long onFinish() {
return results.get();
public Long onResult(Collection<Long> result) {
return result.stream().mapToLong(r -> r).sum();
}
@Override

@ -29,8 +29,6 @@ import org.redisson.misc.CompletableFutureWrapper;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
@ -82,23 +80,18 @@ public class RedissonBuckets implements RBuckets {
Codec commandCodec = new CompositeCodec(StringCodec.INSTANCE, codec, codec);
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("MGET", new MapGetAllDecoder(keysList, 0));
RedisCommand<Map<Object, Object>> command = new RedisCommand<>("MGET", new MapGetAllDecoder(keysList, 0));
return commandExecutor.readBatchedAsync(commandCodec, command, new SlotCallback<Map<Object, Object>, Map<String, V>>() {
final Map<String, V> results = new ConcurrentHashMap<>();
@Override
public void onSlotResult(List<Object> keys, Map<Object, Object> result) {
for (Map.Entry<Object, Object> entry : result.entrySet()) {
if (entry.getKey() != null && entry.getValue() != null) {
String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) entry.getKey());
results.put(key, (V) entry.getValue());
}
}
}
@Override
public Map<String, V> onFinish() {
return results;
public Map<String, V> onResult(Collection<Map<Object, Object>> result) {
return result.stream()
.flatMap(c -> c.entrySet().stream())
.filter(e -> e.getKey() != null && e.getValue() != null)
.map(e -> {
String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) e.getKey());
return new AbstractMap.SimpleEntry<>(key, (V) e.getValue());
}).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
@Override
@ -116,21 +109,7 @@ public class RedissonBuckets implements RBuckets {
Map<String, ?> mappedBuckets = map(buckets);
return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSETNX, new SlotCallback<Boolean, Boolean>() {
final AtomicBoolean result = new AtomicBoolean(true);
@Override
public void onSlotResult(List<Object> keys, Boolean result) {
if (!result && this.result.get()){
this.result.set(result);
}
}
@Override
public Boolean onFinish() {
return this.result.get();
}
return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSETNX, new BooleanSlotCallback() {
@Override
public Object[] createParams(List<Object> keys) {
List<Object> params = new ArrayList<>(keys.size());
@ -161,16 +140,7 @@ public class RedissonBuckets implements RBuckets {
Map<String, ?> mappedBuckets = map(buckets);
return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new SlotCallback<Void, Void>() {
@Override
public void onSlotResult(List<Object> keys, Void result) {
}
@Override
public Void onFinish() {
return null;
}
return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new VoidSlotCallback() {
@Override
public Object[] createParams(List<Object> keys) {
List<Object> params = new ArrayList<>(keys.size());

@ -192,8 +192,8 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
for (Object member : members) {
encode(params, member);
}
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOHASH",
new MapGetAllDecoder((List<Object>) Arrays.asList(members), 0));
RedisCommand<Map<Object, Object>> command = new RedisCommand<>("GEOHASH",
new MapGetAllDecoder(Arrays.asList(members), 0));
return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, command, params.toArray());
}

@ -258,14 +258,14 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return oldValueFuture.thenCompose(oldValue -> {
return CompletableFuture.supplyAsync(() -> remappingFunction.apply(key, oldValue), getServiceManager().getExecutor())
.thenCompose(newValue -> {
if (newValue != null) {
if (newValue == null) {
if (oldValue != null) {
return fastPutAsync(key, newValue)
return fastRemoveAsync(key)
.thenApply(rr -> newValue);
}
return CompletableFuture.completedFuture(newValue);
}
return fastRemoveAsync(key)
return fastPutAsync(key, newValue)
.thenApply(rr -> newValue);
});
}).whenComplete((c, e) -> {

@ -20,8 +20,10 @@ import org.redisson.api.listener.MapExpiredListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapNativeAllDecoder;
import org.redisson.misc.CompletableFutureWrapper;
import java.time.Duration;
@ -245,6 +247,26 @@ public class RedissonMapCacheNative<K, V> extends RedissonMap<K, V> implements R
return commandExecutor.readAsync(name, StringCodec.INSTANCE, RedisCommands.HPTTL, name, "FIELDS", 1, encodeMapKey(key));
}
@Override
public Map<K, Long> remainTimeToLive(Set<K> keys) {
return get(remainTimeToLiveAsync(keys));
}
@Override
public RFuture<Map<K, Long>> remainTimeToLiveAsync(Set<K> keys) {
List<Object> plainKeys = new ArrayList<>(keys);
List<Object> params = new ArrayList<>(keys.size() + 1);
params.add(getRawName());
params.add("FIELDS");
params.add(plainKeys.size());
encodeMapKeys(params, plainKeys);
RedisCommand<Map<Object, Object>> command = new RedisCommand<>("HPTTL",
new MapNativeAllDecoder(plainKeys, Long.class));
return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, command, params.toArray());
}
@Override
public void putAll(Map<? extends K, ? extends V> map, Duration ttl) {
get(putAllAsync(map, ttl));
@ -412,14 +434,34 @@ public class RedissonMapCacheNative<K, V> extends RedissonMap<K, V> implements R
}
@Override
public boolean clearExpire(K key) {
public Boolean clearExpire(K key) {
return get(clearExpireAsync(key));
}
@Override
public RFuture<Boolean> clearExpireAsync(K key) {
String name = getRawName(key);
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.HPERSIST, name, encodeMapKey(key));
return commandExecutor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.HPERSIST, name, "FIELDS", 1, encodeMapKey(key));
}
@Override
public Map<K, Boolean> clearExpire(Set<K> keys) {
return get(clearExpireAsync(keys));
}
@Override
public RFuture<Map<K, Boolean>> clearExpireAsync(Set<K> keys) {
List<Object> plainKeys = new ArrayList<>(keys);
List<Object> params = new ArrayList<>(keys.size() + 1);
params.add(getRawName());
params.add("FIELDS");
params.add(plainKeys.size());
encodeMapKeys(params, plainKeys);
RedisCommand<Map<Object, Object>> command = new RedisCommand<>("HPERSIST",
new MapNativeAllDecoder(plainKeys, Boolean.class));
return commandExecutor.readAsync(getRawName(), StringCodec.INSTANCE, command, params.toArray());
}
@Override

@ -26,6 +26,7 @@ import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -264,4 +265,79 @@ public class RedissonScript implements RScript {
return commandExecutor.get(evalAsync(key, mode, luaScript, returnType, keys, values));
}
@Override
public <R> R eval(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values) {
return commandExecutor.get(evalAsync(mode, luaScript, returnType, resultMapper, values));
}
@Override
public <R> RFuture<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values) {
List<Object> args = new ArrayList<>();
args.add(luaScript);
args.add(0);
for (Object object : values) {
args.add(commandExecutor.encode(codec, object));
}
List<CompletableFuture<R>> futures;
if (mode == Mode.READ_ONLY) {
futures = commandExecutor.readAllAsync(codec, returnType.getCommand(), args.toArray());
} else {
futures = commandExecutor.writeAllAsync(codec, returnType.getCommand(), args.toArray());
}
CompletableFuture<Void> r = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> res = r.thenApply(v -> {
List<R> l = futures.stream().map(f -> f.join()).collect(Collectors.toList());
return resultMapper.apply(l);
});
return new CompletableFutureWrapper<>(res);
}
@Override
public <R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values) {
return commandExecutor.get(evalShaAsync(mode, shaDigest, returnType, resultMapper, values));
}
@Override
public <R> RFuture<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values) {
List<Object> args = new ArrayList<>();
args.add(shaDigest);
args.add(0);
for (Object object : values) {
args.add(commandExecutor.encode(codec, object));
}
if (mode == Mode.READ_ONLY && commandExecutor.isEvalShaROSupported()) {
RedisCommand cmd = new RedisCommand(returnType.getCommand(), "EVALSHA_RO");
List<CompletableFuture<R>> futures = commandExecutor.readAllAsync(codec, cmd, args.toArray());
CompletableFuture<Void> r = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> rr = r.handle((res, e) -> {
if (e != null) {
if (e.getMessage().startsWith("ERR unknown command")) {
commandExecutor.setEvalShaROSupported(false);
return evalShaAsync(mode, shaDigest, returnType, resultMapper, values);
}
CompletableFuture<R> ex = new CompletableFuture<>();
ex.completeExceptionally(e);
return ex;
}
List<R> l = futures.stream().map(f -> f.join()).collect(Collectors.toList());
R result = resultMapper.apply(l);
return CompletableFuture.completedFuture(result);
}).thenCompose(ff -> ff);
return new CompletableFutureWrapper<>(rr);
}
List<CompletableFuture<R>> futures = commandExecutor.readAllAsync(codec, returnType.getCommand(), args.toArray());
CompletableFuture<Void> r = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> res = r.thenApply(v -> {
List<R> l = futures.stream().map(f -> f.join()).collect(Collectors.toList());
return resultMapper.apply(l);
});
return new CompletableFutureWrapper<>(res);
}
}

@ -18,6 +18,7 @@ package org.redisson;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.MasterSlaveEntry;
import java.util.Collection;
import java.util.List;
/**
@ -41,8 +42,6 @@ public interface SlotCallback<T, R> {
return params.toArray();
}
void onSlotResult(List<Object> keys, T result);
R onFinish();
R onResult(Collection<T> result);
}

@ -15,8 +15,8 @@
*/
package org.redisson;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class VoidSlotCallback implements SlotCallback<Void, Void> {
private final AtomicBoolean r = new AtomicBoolean();
private final Object[] params;
public VoidSlotCallback() {
@ -38,11 +36,7 @@ public class VoidSlotCallback implements SlotCallback<Void, Void> {
}
@Override
public void onSlotResult(List<Object> keys, Void res) {
}
@Override
public Void onFinish() {
public Void onResult(Collection<Void> result) {
return null;
}

@ -18,6 +18,7 @@ package org.redisson.api;
import org.redisson.api.map.MapWriter;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
/**
@ -107,13 +108,24 @@ public interface RMapCacheNative<K, V> extends RMap<K, V>, RMapCacheNativeAsync<
/**
* Remaining time to live of map entry associated with a <code>key</code>.
*
* @param key - map key
* @param key map key
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
long remainTimeToLive(K key);
/**
* Remaining time to live of map entries associated with <code>keys</code>.
*
* @param keys map keys
* @return Time to live mapped by key.
* Time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Map<K, Long> remainTimeToLive(Set<K> keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
@ -127,13 +139,25 @@ public interface RMapCacheNative<K, V> extends RMap<K, V>, RMapCacheNativeAsync<
void putAll(java.util.Map<? extends K, ? extends V> map, Duration ttl);
/**
* Clear an expire timeout or expire date of specified entry by key.
* Clears an expiration timeout or date of specified entry by key.
*
* @param key map key
* @return <code>true</code> if timeout was removed
* <code>false</code> if object does not exist or does not have an associated timeout
* <code>false</code> if entry does not have an associated timeout
* <code>null</code> if entry does not exist
*/
Boolean clearExpire(K key);
/**
* Clears an expiration timeout or date of specified entries by keys.
*
* @param keys map keys
* @return Boolean mapped by key.
* <code>true</code> if timeout was removed
* <code>false</code> if entry does not have an associated timeout
* <code>null</code> if entry does not exist
*/
boolean clearExpire(K key);
Map<K, Boolean> clearExpire(Set<K> keys);
/**
* Updates time to live of specified entry by key.

@ -18,6 +18,7 @@ package org.redisson.api;
import org.redisson.api.map.MapWriter;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
/**
@ -114,6 +115,8 @@ public interface RMapCacheNativeAsync<K, V> extends RMapAsync<K, V> {
*/
RFuture<Long> remainTimeToLiveAsync(K key);
RFuture<Map<K, Long>> remainTimeToLiveAsync(Set<K> keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
@ -127,14 +130,26 @@ public interface RMapCacheNativeAsync<K, V> extends RMapAsync<K, V> {
RFuture<Void> putAllAsync(java.util.Map<? extends K, ? extends V> map, Duration ttl);
/**
* Clear an expire timeout or expire date of specified entry by key.
* Clears an expiration timeout or date of specified entry by key.
*
* @param key map key
* @return <code>true</code> if timeout was removed
* <code>false</code> if object does not exist or does not have an associated timeout
* <code>false</code> if entry does not have an associated timeout
* <code>null</code> if entry does not exist
*/
RFuture<Boolean> clearExpireAsync(K key);
/**
* Clears an expiration timeout or date of specified entries by keys.
*
* @param keys map keys
* @return Boolean mapped by key.
* <code>true</code> if timeout was removed
* <code>false</code> if entry does not have an associated timeout
* <code>null</code> if entry does not exist
*/
RFuture<Map<K, Boolean>> clearExpireAsync(Set<K> keys);
/**
* Updates time to live and max idle time of specified entry by key.
* Entry expires when specified time to live was reached.

@ -19,6 +19,7 @@ import org.redisson.api.map.MapWriter;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
/**
@ -107,13 +108,24 @@ public interface RMapCacheNativeReactive<K, V> extends RMapReactive<K, V>, RDest
/**
* Remaining time to live of map entry associated with a <code>key</code>.
*
* @param key - map key
* @param key map key
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Mono<Long> remainTimeToLive(K key);
/**
* Remaining time to live of map entries associated with <code>keys</code>.
*
* @param keys map keys
* @return Time to live mapped by key.
* Time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Mono<Map<K, Long>> remainTimeToLive(Set<K> keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
@ -127,14 +139,26 @@ public interface RMapCacheNativeReactive<K, V> extends RMapReactive<K, V>, RDest
Mono<Void> putAll(java.util.Map<? extends K, ? extends V> map, Duration ttl);
/**
* Clear an expire timeout or expire date of specified entry by key.
* Clears an expiration timeout or date of specified entry by key.
*
* @param key map key
* @return <code>true</code> if timeout was removed
* <code>false</code> if object does not exist or does not have an associated timeout
* <code>false</code> if entry does not have an associated timeout
* <code>null</code> if entry does not exist
*/
Mono<Boolean> clearExpire(K key);
/**
* Clears an expiration timeout or date of specified entries by keys.
*
* @param keys map keys
* @return Boolean mapped by key.
* <code>true</code> if timeout was removed
* <code>false</code> if entry does not have an associated timeout
* <code>null</code> if entry does not exist
*/
Mono<Map<K, Boolean>> clearExpire(Set<K> keys);
/**
* Updates time to live of specified entry by key.
* Entry expires when specified time to live was reached.

@ -21,6 +21,7 @@ import io.reactivex.rxjava3.core.Single;
import org.redisson.api.map.MapWriter;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
/**
@ -110,13 +111,24 @@ public interface RMapCacheNativeRx<K, V> extends RMapRx<K, V>, RDestroyable {
/**
* Remaining time to live of map entry associated with a <code>key</code>.
*
* @param key - map key
* @param key map key
* @return time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Single<Long> remainTimeToLive(K key);
/**
* Remaining time to live of map entries associated with <code>keys</code>.
*
* @param keys map keys
* @return Time to live mapped by key.
* Time in milliseconds
* -2 if the key does not exist.
* -1 if the key exists but has no associated expire.
*/
Single<Map<K, Long>> remainTimeToLive(Set<K> keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
@ -130,13 +142,25 @@ public interface RMapCacheNativeRx<K, V> extends RMapRx<K, V>, RDestroyable {
Completable putAll(java.util.Map<? extends K, ? extends V> map, Duration ttl);
/**
* Clear an expire timeout or expire date of specified entry by key.
* Clears an expiration timeout or date of specified entry by key.
*
* @param key map key
* @return <code>true</code> if timeout was removed
* <code>false</code> if object does not exist or does not have an associated timeout
* <code>false</code> if entry does not have an associated timeout
* <code>null</code> if entry does not exist
*/
Maybe<Boolean> clearExpire(K key);
/**
* Clears an expiration timeout or date of specified entries by keys.
*
* @param keys map keys
* @return Boolean mapped by key.
* <code>true</code> if timeout was removed
* <code>false</code> if entry does not have an associated timeout
* <code>null</code> if entry does not exist
*/
Single<Boolean> clearExpire(K key);
Single<Map<K, Boolean>> clearExpire(Set<K> keys);
/**
* Updates time to live of specified entry by key.

@ -18,10 +18,12 @@ package org.redisson.api;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
/**
* Interface for Redis Script feature
* API for Redis Lua scripts execution.
*
* @author Nikita Koksharov
*
@ -69,11 +71,26 @@ public interface RScript extends RScriptAsync {
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script stored in Redis scripts cache by SHA-1 digest <code>shaDigest</code>.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script stored in Redis scripts cache by SHA-1 digest
*
@ -83,7 +100,7 @@ public interface RScript extends RScriptAsync {
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
@ -102,17 +119,32 @@ public interface RScript extends RScriptAsync {
/**
* Executes Lua script
*
* @param <R> - type of result
* @param key - used to locate Redis node in Cluster which stores cached Lua script
* @param key - used to locate Redis node in Cluster which stores cached Lua script
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> R eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> R eval(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script
*
@ -121,7 +153,7 @@ public interface RScript extends RScriptAsync {
* @param luaScript - lua script
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> R eval(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);

@ -18,7 +18,9 @@ package org.redisson.api;
import org.redisson.api.RScript.Mode;
import org.redisson.api.RScript.ReturnType;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
/**
* Async interface for Redis Script feature
@ -43,10 +45,25 @@ public interface RScriptAsync {
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> RFuture<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script stored in Redis scripts cache by SHA-1 digest <code>shaDigest</code>.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> RFuture<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script stored in Redis scripts cache by SHA-1 digest
@ -57,7 +74,7 @@ public interface RScriptAsync {
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> RFuture<R> evalShaAsync(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
@ -81,11 +98,26 @@ public interface RScriptAsync {
* @param luaScript - lua script
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> RFuture<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> RFuture<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script
*
@ -95,7 +127,7 @@ public interface RScriptAsync {
* @param luaScript - lua script
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> RFuture<R> evalAsync(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);

@ -19,7 +19,9 @@ import org.redisson.api.RScript.Mode;
import org.redisson.api.RScript.ReturnType;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
/**
* Reactive interface for Redis Script feature
@ -44,7 +46,7 @@ public interface RScriptReactive {
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> Mono<R> evalSha(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
@ -58,11 +60,11 @@ public interface RScriptReactive {
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> Mono<R> evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes Lua script stored in Redis scripts cache by SHA-1 digest
*
@ -74,6 +76,21 @@ public interface RScriptReactive {
*/
<R> Mono<R> evalSha(Mode mode, String shaDigest, ReturnType returnType);
/**
* Executes a Lua script stored in Redis scripts cache by SHA-1 digest <code>shaDigest</code>.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> Mono<R> evalSha(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script
*
@ -82,7 +99,7 @@ public interface RScriptReactive {
* @param luaScript - lua script
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> Mono<R> eval(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
@ -107,11 +124,26 @@ public interface RScriptReactive {
* @param luaScript - lua script
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> Mono<R> eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> Mono<R> eval(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Loads Lua script into Redis scripts cache and returns its SHA-1 digest
*

@ -15,7 +15,9 @@
*/
package org.redisson.api;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import org.redisson.api.RScript.Mode;
import org.redisson.api.RScript.ReturnType;
@ -47,7 +49,7 @@ public interface RScriptRx {
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> Maybe<R> evalSha(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
@ -61,11 +63,11 @@ public interface RScriptRx {
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> Maybe<R> evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes Lua script stored in Redis scripts cache by SHA-1 digest
*
@ -77,6 +79,21 @@ public interface RScriptRx {
*/
<R> Maybe<R> evalSha(Mode mode, String shaDigest, ReturnType returnType);
/**
* Executes a Lua script stored in Redis scripts cache by SHA-1 digest <code>shaDigest</code>.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> Maybe<R> evalSha(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script
*
@ -85,7 +102,7 @@ public interface RScriptRx {
* @param luaScript - lua script
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> Maybe<R> eval(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
@ -110,11 +127,26 @@ public interface RScriptRx {
* @param luaScript - lua script
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @param values - values available through ARGV param in script
* @return result object
*/
<R> Maybe<R> eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> Maybe<R> eval(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Loads Lua script into Redis scripts cache and returns its SHA-1 digest
*

@ -506,7 +506,17 @@ public interface RedisCommands {
RedisStrictCommand<Void> MSET = new RedisStrictCommand<Void>("MSET", new VoidReplayConvertor());
RedisStrictCommand<Boolean> MSETNX = new RedisStrictCommand<Boolean>("MSETNX", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> HPERSIST = new RedisStrictCommand<Boolean>("HPERSIST", new BooleanReplayConvertor());
RedisCommand<Boolean> HPERSIST = new RedisCommand("HPERSIST", new ListFirstObjectDecoder(), new Convertor<Boolean>() {
@Override
public Boolean convert(Object obj) {
Long val = (Long) obj;
if (val == -2) {
return null;
}
return val == 1;
}
});
RedisCommand<Long> HPTTL = new RedisCommand("HPTTL", new ListFirstObjectDecoder(), new LongReplayConvertor());
RedisStrictCommand<Boolean> HSETNX = new RedisStrictCommand<Boolean>("HSETNX", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> HSET = new RedisStrictCommand<Boolean>("HSET", new BooleanReplayConvertor());

@ -89,6 +89,8 @@ public interface CommandAsyncExecutor {
<R> List<CompletableFuture<R>> writeAllAsync(RedisCommand<?> command, Object... params);
<R> List<CompletableFuture<R>> writeAllAsync(Codec codec, RedisCommand<?> command, Object... params);
<R> List<CompletableFuture<R>> readAllAsync(Codec codec, RedisCommand<?> command, Object... params);
<R> List<CompletableFuture<R>> readAllAsync(RedisCommand<?> command, Object... params);

@ -292,17 +292,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T> RFuture<Void> writeAllVoidAsync(RedisCommand<T> command, Object... params) {
List<CompletableFuture<Void>> futures = writeAllAsync(command, StringCodec.INSTANCE, params);
List<CompletableFuture<Void>> futures = writeAllAsync(StringCodec.INSTANCE, command, params);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
@Override
public <R> List<CompletableFuture<R>> writeAllAsync(RedisCommand<?> command, Object... params) {
return writeAllAsync(command, codec, params);
return writeAllAsync(codec, command, params);
}
private <R> List<CompletableFuture<R>> writeAllAsync(RedisCommand<?> command, Codec codec, Object... params) {
@Override
public <R> List<CompletableFuture<R>> writeAllAsync(Codec codec, RedisCommand<?> command, Object... params) {
List<CompletableFuture<R>> futures = connectionManager.getEntrySet().stream().map(e -> {
RFuture<R> f = async(false, new NodeSource(e),
codec, command, params, true, false);
@ -506,7 +507,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
public void setEvalShaROSupported(boolean value) {
this.EVAL_SHA_RO_SUPPORTED.set(value);
EVAL_SHA_RO_SUPPORTED.set(value);
}
private static final Pattern COMMANDS_PATTERN = Pattern.compile("redis\\.call\\(['\"]{1}([\\w.]+)['\"]{1}");
@ -729,7 +730,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}, Collectors.toList())));
}
Map<List<Object>, CompletableFuture<?>> futures = new IdentityHashMap<>();
List<CompletableFuture<?>> futures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService;
@ -747,13 +748,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
Object[] keysArray = callback.createKeys(entry.getKey(), groupedKeys);
Object[] paramsArray = callback.createParams(Collections.emptyList());
RFuture<T> f;
if (readOnly) {
RFuture<T> f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.put(groupedKeys, f.toCompletableFuture());
f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
} else {
RFuture<T> f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
futures.put(groupedKeys, f.toCompletableFuture());
f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray);
}
futures.add(f.toCompletableFuture());
}
if (!(this instanceof CommandBatchService)) {
@ -761,12 +762,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> result = future.thenApply(r -> {
futures.entrySet().forEach(e -> {
callback.onSlotResult(e.getKey(), (T) e.getValue().join());
});
return callback.onFinish();
List<T> res = futures.stream()
.map(e -> (T) e.join())
.collect(Collectors.toList());
return callback.onResult(res);
});
return new CompletableFutureWrapper<>(result);
@ -802,7 +803,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}, Collectors.toList())));
Map<List<Object>, CompletableFuture<?>> futures = new IdentityHashMap<>();
List<CompletableFuture<?>> futures = new ArrayList<>();
List<CompletableFuture<?>> mainFutures = new ArrayList<>();
for (Entry<MasterSlaveEntry, Map<Integer, List<Object>>> entry : entry2keys.entrySet()) {
// executes in batch due to CROSSLOT error
@ -820,13 +821,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
c = newCommand;
}
Object[] params = callback.createParams(groupedKeys);
RFuture<T> f;
if (readOnly) {
RFuture<T> f = executorService.readAsync(entry.getKey(), codec, c, params);
futures.put(groupedKeys, f.toCompletableFuture());
f = executorService.readAsync(entry.getKey(), codec, c, params);
} else {
RFuture<T> f = executorService.writeAsync(entry.getKey(), codec, c, params);
futures.put(groupedKeys, f.toCompletableFuture());
f = executorService.writeAsync(entry.getKey(), codec, c, params);
}
futures.add(f.toCompletableFuture());
}
if (!(this instanceof CommandBatchService)) {
@ -839,15 +840,15 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (!mainFutures.isEmpty()) {
future = CompletableFuture.allOf(mainFutures.toArray(new CompletableFuture[0]));
} else {
future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));
future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
CompletableFuture<R> result = future.thenApply(r -> {
futures.entrySet().forEach(e -> {
if (!e.getValue().isCompletedExceptionally() && e.getValue().getNow(null) != null) {
callback.onSlotResult(e.getKey(), (T) e.getValue().getNow(null));
}
});
return callback.onFinish();
List<T> res = futures.stream()
.filter(e -> !e.isCompletedExceptionally() && e.getNow(null) != null)
.map(e -> (T) e.join())
.collect(Collectors.toList());
return callback.onResult(res);
});
return new CompletableFutureWrapper<>(result);

@ -491,7 +491,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void shutdown() {
shutdown(2, 10, TimeUnit.SECONDS); //default netty value
shutdown(0, 10, TimeUnit.SECONDS); //default netty value
}
@Override

@ -0,0 +1,61 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection.decoder;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class MapNativeAllDecoder implements MultiDecoder<Map<Object, Object>> {
private final List<Object> args;
private final Class<?> valueClass;
public MapNativeAllDecoder(List<Object> args, Class<?> valueClass) {
this.args = args;
this.valueClass = valueClass;
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return new HashMap<>();
}
Map<Object, Object> result = new LinkedHashMap<>(parts.size());
for (int index = 0; index < parts.size(); index++) {
Long value = (Long) parts.get(index);
if (value == -2 && valueClass != Long.class) {
continue;
}
if (valueClass == Boolean.class) {
result.put(args.get(index), value == 1);
} else {
result.put(args.get(index), value);
}
}
return result;
}
}

@ -119,7 +119,11 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
value.getPromise().completeExceptionally(e);
return;
}
value.getPromise().complete(value);
if (!value.getPromise().complete(value)) {
if (value.getPromise().isCompletedExceptionally()) {
entries.remove(entryName);
}
}
});
});

@ -269,14 +269,43 @@ public abstract class BaseMapTest extends RedisDockerTest {
@Test
public void testCompute() {
RMap<String, String> map = getMap("map");
map.compute("1", (key, oldValue) -> {
return "12";
});
assertThat(map.get("1")).isEqualTo("12");
map.compute("1", (key, oldValue) -> {
return (oldValue == null) ? "12" : oldValue.concat("34");
});
assertThat(map.get("1")).isEqualTo("1234");
map.compute("1", (key, oldValue) -> {
return null;
});
assertThat(map.get("1")).isNull();
}
@Test
public void testComputeAsync() {
RMap<String, String> map = getMap("mapAsync");
RFuture<String> res1 = map.computeAsync("1", (key, oldValue) -> {
return "12";
});
assertThat(res1.toCompletableFuture().join()).isEqualTo("12");
assertThat(map.get("1")).isEqualTo("12");
RFuture<String> res2 = map.computeAsync("1", (key, oldValue) -> {
return (oldValue == null) ? "12" : oldValue.concat("34");
});
assertThat(res2.toCompletableFuture().join()).isEqualTo("1234");
assertThat(map.get("1")).isEqualTo("1234");
RFuture<String> res3 = map.computeAsync("1", (key, oldValue) -> {
return null;
});
assertThat(res3.toCompletableFuture().join()).isNull();
assertThat(map.get("1")).isNull();
}

@ -216,7 +216,13 @@ public class RedissonMapCacheNativeTest extends BaseMapTest {
map.put("5", "6", Duration.ofSeconds(20));
assertThat(map.remainTimeToLive("1")).isLessThan(9900);
map.destroy();
Map<String, Long> r = map.remainTimeToLive(Set.of("0", "1", "3", "5", "6"));
assertThat(r.get("0")).isEqualTo(-2);
assertThat(r.get("1")).isGreaterThan(1);
assertThat(r.get("3")).isEqualTo(-1);
assertThat(r.get("5")).isGreaterThan(1);
assertThat(r.get("6")).isEqualTo(-2);
}
@Test
@ -384,6 +390,23 @@ public class RedissonMapCacheNativeTest extends BaseMapTest {
Assertions.assertEquals(0, cache.size());
cache.destroy();
}
@Test
public void testClear() {
RMapCacheNative<String, String> cache = redisson.getMapCacheNative("simple");
cache.put("0", "8", Duration.ofSeconds(1));
cache.put("02", "18", Duration.ofSeconds(1));
cache.put("03", "38", Duration.ofSeconds(1));
assertThat(cache.clearExpire("0")).isTrue();
assertThat(cache.clearExpire("01")).isNull();
Map<String, Boolean> r = cache.clearExpire(Set.of("0", "02", "03", "04"));
assertThat(r.get("0")).isFalse();
assertThat(r.get("02")).isTrue();
assertThat(r.get("03")).isTrue();
assertThat(r.get("04")).isNull();
}
@Test
public void testClearExpire() throws InterruptedException {

@ -102,7 +102,17 @@ public class RedissonScriptTest extends RedisDockerTest {
RFuture<List<Object>> res = script.evalAsync(RScript.Mode.READ_ONLY, "return {'1','2','3.3333','foo',nil,'bar'}", RScript.ReturnType.MULTI, Collections.emptyList());
assertThat(res.toCompletableFuture().join()).containsExactly("1", "2", "3.3333", "foo");
}
@Test
public void testEvalResultMapping() {
testInCluster(redissonClient -> {
RScript script = redissonClient.getScript(StringCodec.INSTANCE);
Long res = script.eval(RScript.Mode.READ_ONLY, "return 1;", RScript.ReturnType.INTEGER,
integers -> integers.stream().mapToLong(r -> r).sum());
assertThat(res).isEqualTo(3);
});
}
@Test
public void testScriptEncoding() {
RScript script = redisson.getScript();

@ -0,0 +1,79 @@
package org.redisson.pubsub;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Tested;
import org.junit.jupiter.api.Test;
import org.redisson.RedissonLockEntry;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.misc.AsyncSemaphore;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.*;
public class PublishSubscribeTest {
@Tested
private LockPubSub lockPubSub;
@Injectable
private PublishSubscribeService publishSubscribeService;
@Test
public void testSubscribeForRaceCondition() throws InterruptedException {
AtomicReference<CompletableFuture<PubSubConnectionEntry>> sRef = new AtomicReference<>();
new MockUp<PublishSubscribeService>() {
@Mock
AsyncSemaphore getSemaphore(ChannelName channelName) {
return new AsyncSemaphore(1);
}
@Mock
CompletableFuture<PubSubConnectionEntry> subscribeNoTimeout(
Codec codec, String channelName,
AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
sRef.set(new CompletableFuture<>());
return sRef.get();
}
};
CompletableFuture<RedissonLockEntry> newPromise = lockPubSub.subscribe(
"test", "redisson_lock__channel__test"
);
sRef.get().whenComplete((r, e) -> {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
});
Thread thread1 = new Thread(() -> sRef.get().complete(null));
Thread thread2 = new Thread(() -> newPromise.completeExceptionally(new RedisTimeoutException("test")));
thread1.start();
thread2.start();
thread1.join();
thread2.join();
assertTrue(newPromise.isCompletedExceptionally());
assertTrue(sRef.get().isDone());
assertFalse(sRef.get().isCompletedExceptionally());
CompletableFuture<RedissonLockEntry> secondPromise = lockPubSub.subscribe(
"test", "redisson_lock__channel__test"
);
Thread thread3 = new Thread(() -> secondPromise.complete(null));
thread3.start();
thread3.join();
assertTrue(secondPromise.isDone());
assertFalse(secondPromise.isCompletedExceptionally());
}
}
Loading…
Cancel
Save