Merge branch 'mrniko/master' into test-timeout

pull/616/head
jackygurui 9 years ago
commit a51f82fca0

@ -65,7 +65,7 @@ Articles
[Java Remote Method Invocation with Redisson](https://dzone.com/articles/java-remote-method-invocation-with-redisson)
[Java Multimaps With Redis](https://dzone.com/articles/multimaps-with-redis)
Easy to use. Quick start
Quick start
===============================
#### Maven

@ -15,13 +15,17 @@
*/
package org.redisson;
import java.math.BigDecimal;
import java.util.AbstractCollection;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
@ -30,31 +34,36 @@ import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.Cache;
import org.redisson.misc.Hash;
import org.redisson.misc.LFUCacheMap;
import org.redisson.misc.LRUCacheMap;
import org.redisson.misc.NoneCacheMap;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements RLocalCachedMap<K, V> {
public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements RLocalCachedMap<K, V> {
public static class LocalCachedMapClear {
@ -62,16 +71,22 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
public static class LocalCachedMapInvalidate {
private byte[] excludedId;
private byte[] keyHash;
public LocalCachedMapInvalidate() {
}
public LocalCachedMapInvalidate(byte[] keyHash) {
public LocalCachedMapInvalidate(byte[] excludedId, byte[] keyHash) {
super();
this.keyHash = keyHash;
this.excludedId = excludedId;
}
public byte[] getExcludedId() {
return excludedId;
}
public byte[] getKeyHash() {
return keyHash;
}
@ -162,12 +177,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
}
}
private static final RedisCommand<Set<Object>> ALL_KEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
private static final RedisCommand<Set<Entry<Object, Object>>> ALL_ENTRIES = new RedisCommand<Set<Entry<Object, Object>>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private byte[] id;
private RTopic<Object> invalidationTopic;
private RMap<K, V> map;
private Cache<CacheKey, CacheValue> cache;
private int invalidateEntryOnChange;
private int invalidationListenerId;
@ -183,7 +200,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
}
private void init(RedissonClient redisson, String name, LocalCachedMapOptions options) {
map = redisson.getMap(name);
id = generateId();
if (options.isInvalidateEntryOnChange()) {
invalidateEntryOnChange = 1;
@ -207,34 +224,17 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
cache.clear();
}
if (msg instanceof LocalCachedMapInvalidate) {
CacheKey key = new CacheKey(((LocalCachedMapInvalidate)msg).getKeyHash());
cache.remove(key);
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), id)) {
CacheKey key = new CacheKey(invalidateMsg.getKeyHash());
cache.remove(key);
}
}
}
});
}
}
@Override
public int size() {
return get(sizeAsync());
}
@Override
public RFuture<Integer> sizeAsync() {
return map.sizeAsync();
}
@Override
public boolean isEmpty() {
return map.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return get(containsKeyAsync(key));
}
private CacheKey toCacheKey(Object key) {
byte[] encoded = encodeMapKey(key);
return toCacheKey(encoded);
@ -248,30 +248,20 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
public RFuture<Boolean> containsKeyAsync(Object key) {
CacheKey cacheKey = toCacheKey(key);
if (!cache.containsKey(cacheKey)) {
return map.containsKeyAsync(key);
return super.containsKeyAsync(key);
}
return newSucceededFuture(true);
}
@Override
public boolean containsValue(Object value) {
return get(containsValueAsync(value));
}
@Override
public RFuture<Boolean> containsValueAsync(Object value) {
CacheValue cacheValue = new CacheValue(null, value);
if (!cache.containsValue(cacheValue)) {
return map.containsValueAsync(value);
return super.containsValueAsync(value);
}
return newSucceededFuture(true);
}
@Override
public V get(Object key) {
return get(getAsync(key));
}
@Override
public RFuture<V> getAsync(final Object key) {
if (key == null) {
@ -284,7 +274,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
return newSucceededFuture((V)cacheValue.getValue());
}
RFuture<V> future = map.getAsync((K)key);
RFuture<V> future = super.getAsync((K)key);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
@ -300,13 +290,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
});
return future;
}
@Override
public V put(K key, V value) {
return get(putAsync(key, value));
}
protected byte[] generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
ThreadLocalRandom.current().nextBytes(id);
return id;
}
@Override
public RFuture<V> putAsync(K key, V value) {
if (key == null) {
@ -318,7 +309,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
byte[] mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] msg = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
@ -331,11 +322,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
mapKey, encodeMapValue(value), msg, invalidateEntryOnChange);
}
@Override
public boolean fastPut(K key, V value) {
return get(fastPutAsync(key, value));
}
@Override
public RFuture<Boolean> fastPutAsync(K key, V value) {
if (key == null) {
@ -348,7 +334,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
byte[] encodedKey = encodeMapKey(key);
byte[] encodedValue = encodeMapKey(value);
CacheKey cacheKey = toCacheKey(encodedKey);
byte[] msg = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
@ -370,11 +356,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
}
}
@Override
public V remove(Object key) {
return get(removeAsync((K)key));
}
@Override
public RFuture<V> removeAsync(K key) {
if (key == null) {
@ -383,7 +364,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
byte[] keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
cache.remove(cacheKey);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
@ -396,30 +377,40 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
}
@Override
public boolean fastRemove(Object key) {
return get(fastRemoveAsync((K)key));
}
@Override
public RFuture<Boolean> fastRemoveAsync(K key) {
if (key == null) {
public RFuture<Long> fastRemoveAsync(K ... keys) {
if (keys == null) {
throw new NullPointerException();
}
byte[] keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
cache.remove(cacheKey);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[2]); "
+ "end; "
+ "return 1;"
List<Object> params = new ArrayList<Object>();
params.add(invalidateEntryOnChange);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
if (invalidateEntryOnChange == 1) {
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
params.add(msgEncoded);
} else {
params.add(null);
}
}
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0; " +
"for j = 2, #ARGV, 2 do "
+ "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then "
+ "if ARGV[1] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "end; "
+ "counter = counter + 1;"
+ "end;"
+ "end;"
+ "return 0;",
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
keyEncoded, msgEncoded, invalidateEntryOnChange);
params.toArray());
}
@ -432,17 +423,15 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
cacheMap.put(cacheKey, cacheValue);
}
cache.putAll(cacheMap);
map.putAll(m);
for (CacheKey cacheKey : cacheMap.keySet()) {
invalidationTopic.publish(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
super.putAll(m);
if (invalidateEntryOnChange == 1) {
for (CacheKey cacheKey : cacheMap.keySet()) {
invalidationTopic.publish(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
}
}
}
@Override
public void clear() {
delete();
}
@Override
public RFuture<Boolean> deleteAsync() {
cache.clear();
@ -523,7 +512,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
@Override
public Iterator<K> iterator() {
return new CompositeIterable<K>(cacheKeySetIterator(), map.keySet().iterator()) {
return new CompositeIterable<K>(cacheKeySetIterator(), RedissonLocalCachedMap.super.keySet().iterator()) {
@Override
boolean isCacheContains(Object object) {
@ -600,7 +589,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
public final Iterator<Map.Entry<K,V>> iterator() {
return new CompositeIterable<Map.Entry<K,V>>(cacheEntrySetIterator(), map.entrySet().iterator()) {
return new CompositeIterable<Map.Entry<K,V>>(cacheEntrySetIterator(), RedissonLocalCachedMap.super.entrySet().iterator()) {
@Override
boolean isCacheContains(Map.Entry<K,V> entry) {
@ -625,7 +614,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
Map.Entry<?,?> e = (Map.Entry<?,?>) o;
Object key = e.getKey();
Object value = e.getValue();
return RedissonLocalCachedMap.this.map.remove(key, value);
return RedissonLocalCachedMap.super.remove(key, value);
}
return false;
}
@ -696,5 +685,360 @@ public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements R
}
}
@Override
public RFuture<Map<K, V>> getAllAsync(Set<K> keys) {
final Map<K, V> result = new HashMap<K, V>();
Set<K> mapKeys = new HashSet<K>(keys);
for (Iterator<K> iterator = mapKeys.iterator(); iterator.hasNext();) {
K key = iterator.next();
CacheValue value = cache.get(key);
if (value != null) {
result.put(key, (V)value.getValue());
iterator.remove();
}
}
final RPromise<Map<K, V>> promise = newPromise();
RFuture<Map<K, V>> future = super.getAllAsync(mapKeys);
future.addListener(new FutureListener<Map<K, V>>() {
@Override
public void operationComplete(Future<Map<K, V>> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
Map<K, V> map = future.getNow();
result.putAll(map);
cacheMap(map);
promise.trySuccess(result);
}
});
return promise;
}
private void cacheMap(Map<?, ?> map) {
for (java.util.Map.Entry<?, ?> entry : map.entrySet()) {
byte[] mapKey = encodeMapKey(entry.getKey());
CacheKey cacheKey = toCacheKey(mapKey);
CacheValue cacheValue = new CacheValue(entry.getKey(), entry.getValue());
cache.put(cacheKey, cacheValue);
}
}
@Override
public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return newSucceededFuture(null);
}
List<Object> params = new ArrayList<Object>(map.size()*3);
List<Object> msgs = new ArrayList<Object>(map.size());
params.add(invalidateEntryOnChange);
params.add(map.size()*2);
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
byte[] mapKey = encodeMapKey(t.getKey());
byte[] mapValue = encodeMapValue(t.getValue());
params.add(mapKey);
params.add(mapValue);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
msgs.add(msgEncoded);
}
params.addAll(msgs);
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));"
+ "if ARGV[1] == '1' then "
+ "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do "
+ "redis.call('publish', KEYS[2], ARGV[i]); "
+ "end; "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)), params.toArray());
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
return;
}
cacheMap(map);
}
});
return future;
}
@Override
public RFuture<V> addAndGetAsync(final K key, Number value) {
final byte[] keyState = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyState);
byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
RFuture<V> future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())),
"local result = redis.call('HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end; "
+ "return result; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
return;
}
V value = future.getNow();
if (value != null) {
CacheKey cacheKey = toCacheKey(keyState);
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
@Override
public RFuture<Boolean> fastPutIfAbsentAsync(final K key, final V value) {
RFuture<Boolean> future = super.fastPutIfAbsentAsync(key, value);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow()) {
CacheKey cacheKey = toCacheKey(key);
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
@Override
public RFuture<Collection<V>> readAllValuesAsync() {
final List<V> result = new ArrayList<V>();
final List<Object> mapKeys = new ArrayList<Object>();
for (CacheValue value : cache.values()) {
mapKeys.add(encodeMapKey(value.getKey()));
result.add((V) value.getValue());
}
final RPromise<Collection<V>> promise = newPromise();
RFuture<Collection<V>> future = commandExecutor.evalReadAsync(getName(), codec, ALL_KEYS,
"local entries = redis.call('hgetall', KEYS[1]); "
+ "local result = {};"
+ "for j = 1, #entries, 2 do "
+ "local founded = false;"
+ "for i = 1, #ARGV, 1 do "
+ "if ARGV[i] == entries[j] then "
+ "founded = true;"
+ "end;"
+ "end; "
+ "if founded == false then "
+ "table.insert(result, entries[j+1]);"
+ "end;"
+ "end; "
+ "return result; ",
Arrays.<Object>asList(getName()),
mapKeys.toArray());
future.addListener(new FutureListener<Collection<V>>() {
@Override
public void operationComplete(Future<Collection<V>> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
result.addAll(future.get());
promise.trySuccess(result);
}
});
return promise;
}
@Override
public RFuture<Set<Entry<K, V>>> readAllEntrySetAsync() {
final Set<Entry<K, V>> result = new HashSet<Entry<K, V>>();
List<Object> mapKeys = new ArrayList<Object>();
for (CacheValue value : cache.values()) {
mapKeys.add(encodeMapKey(value.getKey()));
result.add(new AbstractMap.SimpleEntry<K, V>((K)value.getKey(), (V)value.getValue()));
}
final RPromise<Set<Entry<K, V>>> promise = newPromise();
RFuture<Set<Entry<K, V>>> future = commandExecutor.evalReadAsync(getName(), codec, ALL_ENTRIES,
"local entries = redis.call('hgetall', KEYS[1]); "
+ "local result = {};"
+ "for j = 1, #entries, 2 do "
+ "local founded = false;"
+ "for i = 1, #ARGV, 1 do "
+ "if ARGV[i] == entries[j] then "
+ "founded = true;"
+ "end;"
+ "end; "
+ "if founded == false then "
+ "table.insert(result, entries[j]);"
+ "table.insert(result, entries[j+1]);"
+ "end;"
+ "end; "
+ "return result; ",
Arrays.<Object>asList(getName()),
mapKeys.toArray());
future.addListener(new FutureListener<Set<Entry<K, V>>>() {
@Override
public void operationComplete(Future<Set<Entry<K, V>>> future) throws Exception {
if (!future.isSuccess()) {
return;
}
result.addAll(future.getNow());
promise.trySuccess(result);
}
});
return promise;
}
@Override
public RFuture<V> replaceAsync(final K key, final V value) {
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
RFuture<V> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end; "
+ "return v; "
+ "else "
+ "return nil; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0)),
keyState, valueState, invalidateEntryOnChange, msg);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow() != null) {
CacheKey cacheKey = toCacheKey(key);
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
@Override
public RFuture<Boolean> replaceAsync(final K key, V oldValue, final V newValue) {
final byte[] keyState = encodeMapKey(key);
byte[] oldValueState = encodeMapValue(oldValue);
byte[] newValueState = encodeMapValue(newValue);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[5]); "
+ "end; "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0)),
keyState, oldValueState, newValueState, invalidateEntryOnChange, msg);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow()) {
cache.put(cacheKey, new CacheValue(key, newValue));
}
}
});
return future;
}
@Override
public RFuture<Boolean> removeAsync(Object key, Object value) {
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash()));
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end; "
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0)),
keyState, valueState, invalidateEntryOnChange, msg);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow()) {
cache.remove(cacheKey);
}
}
});
return future;
}
@Override
public RFuture<V> putIfAbsentAsync(final K key, final V value) {
RFuture<V> future = super.putIfAbsentAsync(key, value);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
return;
}
if (future.getNow() == null) {
CacheKey cacheKey = toCacheKey(key);
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
}

@ -38,7 +38,6 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -59,7 +58,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 4, ValueType.MAP);
static final RedisCommand<Boolean> EVAL_REMOVE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP);
static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
protected RedissonMap(CommandAsyncExecutor commandExecutor, String name) {
@ -244,11 +243,11 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public boolean remove(Object key, Object value) {
return get(removeAsync(key, value)) == 1;
return get(removeAsync(key, value));
}
@Override
public RFuture<Long> removeAsync(Object key, Object value) {
public RFuture<Boolean> removeAsync(Object key, Object value) {
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "

@ -34,7 +34,6 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
@ -42,9 +41,7 @@ import org.redisson.client.protocol.decoder.MapCacheScanResult;
import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ObjectListDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
@ -77,9 +74,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
private static final RedisCommand<Void> EVAL_HMSET = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 4, ValueType.MAP);
private static final RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapReplayDecoder(), new ObjectListReplayDecoder()), ValueType.MAP);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Boolean> EVAL_REMOVE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 9, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Boolean> EVAL_FAST_PUT_TTL = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 9, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_GET_TTL = new RedisCommand<Object>("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE);
@ -283,7 +279,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public RFuture<Long> removeAsync(Object key, Object value) {
public RFuture<Boolean> removeAsync(Object key, Object value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if value == false then "

@ -86,10 +86,6 @@ public class RedissonReference {
this.codec = codec != null ? codec.getClass().getName() : null;
}
public boolean isDefaultCodec() {
return codec == null;
}
/**
* @return the type
* @throws java.lang.Exception - which could be:
@ -152,6 +148,10 @@ public class RedissonReference {
public void setKeyName(String keyName) {
this.keyName = keyName;
}
public String getCodec() {
return codec;
}
/**
* @return the codec

@ -15,8 +15,6 @@
*/
package org.redisson.api;
import java.util.Map;
/**
* Map object with entry cache support.
* <p>
@ -28,31 +26,6 @@ import java.util.Map;
* @param <K>
* @param <V>
*/
public interface RLocalCachedMap<K, V> extends Map<K, V>, RExpirable, RLocalCachedMapAsync<K, V>, RDestroyable {
public interface RLocalCachedMap<K, V> extends RMap<K, V>, RExpirable, RDestroyable {
/**
* Associates the specified <code>value</code> with the specified <code>key</code>.
* <p>
* Works faster than <code>RLocalCachedMap.put</code> but not returning
* the previous value associated with <code>key</code>
*
* @param key
* @param value
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
boolean fastPut(K key, V value);
/**
* Removes <code>key</code> from map
* <p>
* Works faster than <code>RLocalCachedMap.remove</code> but not returning
* the value associated with <code>key</code>
*
* @param key
* @return <code>true</code> if key has been deleted.
* <code>false</code> if key doesn't exist.
*/
boolean fastRemove(Object key);
}

@ -100,7 +100,7 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
RFuture<Boolean> replaceAsync(K key, V oldValue, V newValue);
RFuture<Long> removeAsync(Object key, Object value);
RFuture<Boolean> removeAsync(Object key, Object value);
RFuture<V> putIfAbsentAsync(K key, V value);

@ -77,7 +77,7 @@ public interface RMapReactive<K, V> extends RExpirableReactive {
Publisher<Boolean> replace(K key, V oldValue, V newValue);
Publisher<Long> remove(Object key, Object value);
Publisher<Boolean> remove(Object key, Object value);
Publisher<V> putIfAbsent(K key, V value);

@ -28,7 +28,7 @@ import org.redisson.api.annotation.RObjectField;
*/
public class DefaultCodecProvider implements CodecProvider {
public final ConcurrentMap<Class<? extends Codec>, Codec> codecCache = PlatformDependent.newConcurrentHashMap();
public transient final ConcurrentMap<Class<? extends Codec>, Codec> codecCache = PlatformDependent.newConcurrentHashMap();
@Override
public <T extends Codec> T getCodec(Class<T> codecClass) {

@ -768,73 +768,80 @@ public class CommandAsyncService implements CommandAsyncExecutor {
((RedisClientResult)res).setRedisClient(addr);
}
if (isRedissonReferenceSupportEnabled() && (res instanceof List || res instanceof ListScanResult)) {
List r = res instanceof ListScanResult ? ((ListScanResult)res).getValues() : (List) res;
for (int i = 0; i < r.size(); i++) {
if (r.get(i) instanceof RedissonReference) {
try {
r.set(i ,(redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) r.get(i))
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) r.get(i))));
} catch (Exception exception) {//skip and carry on to next one.
}
} else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) {
try {
ScoredEntry se = ((ScoredEntry) r.get(i));
r.set(i ,new ScoredEntry(se.getScore(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) se.getValue())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue())));
} catch (Exception exception) {//skip and carry on to next one.
}
}
}
if (isRedissonReferenceSupportEnabled()) {
handleReference(details.getMainPromise(), res);
} else {
details.getMainPromise().trySuccess(res);
} else if (isRedissonReferenceSupportEnabled() && (res instanceof MapScanResult)) {
Map<ScanObjectEntry, ScanObjectEntry> map = ((MapScanResult)res).getMap();
HashMap<ScanObjectEntry, ScanObjectEntry> toAdd = null;
for (Map.Entry<ScanObjectEntry, ScanObjectEntry> e : (Set<Map.Entry<ScanObjectEntry, ScanObjectEntry>>) map.entrySet()) {
if (e.getValue().getObj() instanceof RedissonReference) {
try {
e.setValue(new ScanObjectEntry(e.getValue().getBuf(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) e.getValue().getObj())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) e.getValue().getObj())));
} catch (Exception exception) {//skip and carry on to next one.
}
}
} else {
details.getMainPromise().tryFailure(future.cause());
}
AsyncDetails.release(details);
}
private <R, V> void handleReference(RPromise<R> mainPromise, R res) {
if (res instanceof List || res instanceof ListScanResult) {
List r = res instanceof ListScanResult ? ((ListScanResult)res).getValues() : (List) res;
for (int i = 0; i < r.size(); i++) {
if (r.get(i) instanceof RedissonReference) {
try {
r.set(i ,(redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) r.get(i))
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) r.get(i))));
} catch (Exception exception) {//skip and carry on to next one.
}
if (e.getKey().getObj() instanceof RedissonReference) {
if (toAdd == null) {
toAdd = new HashMap<ScanObjectEntry, ScanObjectEntry>();
}
toAdd.put(e.getKey(), e.getValue());
} else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) {
try {
ScoredEntry se = ((ScoredEntry) r.get(i));
r.set(i ,new ScoredEntry(se.getScore(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) se.getValue())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue())));
} catch (Exception exception) {//skip and carry on to next one.
}
}
if (toAdd != null) {
for (Map.Entry<ScanObjectEntry, ScanObjectEntry> e : (Set<Map.Entry<ScanObjectEntry, ScanObjectEntry>>) toAdd.entrySet()) {
try {
map.put(new ScanObjectEntry(e.getValue().getBuf(), (redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) e.getKey().getObj())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) e.getKey().getObj()))), map.remove(e.getKey()));
} catch (Exception exception) {//skip and carry on to next one.
}
}
mainPromise.trySuccess(res);
} else if (res instanceof MapScanResult) {
Map<ScanObjectEntry, ScanObjectEntry> map = ((MapScanResult)res).getMap();
HashMap<ScanObjectEntry, ScanObjectEntry> toAdd = null;
for (Map.Entry<ScanObjectEntry, ScanObjectEntry> e : map.entrySet()) {
if (e.getValue().getObj() instanceof RedissonReference) {
try {
e.setValue(new ScanObjectEntry(e.getValue().getBuf(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) e.getValue().getObj())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) e.getValue().getObj())));
} catch (Exception exception) {//skip and carry on to next one.
}
}
details.getMainPromise().trySuccess(res);
} else if (isRedissonReferenceSupportEnabled() && (res instanceof MapScanResult)) {
} else if (isRedissonReferenceSupportEnabled() && res instanceof RedissonReference) {
try {
details.getMainPromise().trySuccess(redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) res)
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) res));
} catch (Exception exception) {
details.getMainPromise().trySuccess(res);//fallback
if (e.getKey().getObj() instanceof RedissonReference) {
if (toAdd == null) {
toAdd = new HashMap<ScanObjectEntry, ScanObjectEntry>();
}
toAdd.put(e.getKey(), e.getValue());
}
} else {
details.getMainPromise().trySuccess(res);
}
if (toAdd != null) {
for (Map.Entry<ScanObjectEntry, ScanObjectEntry> e : toAdd.entrySet()) {
try {
map.put(new ScanObjectEntry(e.getValue().getBuf(), (redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) e.getKey().getObj())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) e.getKey().getObj()))), map.remove(e.getKey()));
} catch (Exception exception) {//skip and carry on to next one.
}
}
}
mainPromise.trySuccess(res);
} else if (res instanceof RedissonReference) {
try {
mainPromise.trySuccess(redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) res)
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) res));
} catch (Exception exception) {
mainPromise.trySuccess(res);//fallback
}
} else {
details.getMainPromise().tryFailure(future.cause());
mainPromise.trySuccess(res);
}
AsyncDetails.release(details);
}
}

@ -203,8 +203,8 @@ public class ConfigSupport {
mapper.addMixIn(MasterSlaveServersConfig.class, MasterSlaveServersConfigMixIn.class);
mapper.addMixIn(SingleServerConfig.class, SingleSeverConfigMixIn.class);
mapper.addMixIn(Config.class, ConfigMixIn.class);
mapper.addMixIn(CodecProvider.class, ConfigMixIn.class);
mapper.addMixIn(ResolverProvider.class, ConfigMixIn.class);
mapper.addMixIn(CodecProvider.class, ClassMixIn.class);
mapper.addMixIn(ResolverProvider.class, ClassMixIn.class);
mapper.addMixIn(Codec.class, ClassMixIn.class);
mapper.addMixIn(RedissonNodeInitializer.class, ClassMixIn.class);
mapper.addMixIn(LoadBalancer.class, ClassMixIn.class);

@ -41,6 +41,13 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
* Base connection pool class
*
* @author Nikita Koksharov
*
* @param <T> - connection type
*/
abstract class ConnectionPool<T extends RedisConnection> {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -211,8 +218,6 @@ abstract class ConnectionPool<T extends RedisConnection> {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
releaseConnection(entry);
promiseFailure(entry, promise, future.cause());
return;
}
@ -223,13 +228,13 @@ abstract class ConnectionPool<T extends RedisConnection> {
return;
}
promiseSuccessful(entry, promise, conn);
connectedSuccessful(entry, promise, conn);
}
});
return promise;
}
private void promiseSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
entry.resetFailedAttempts();
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
@ -247,15 +252,20 @@ abstract class ConnectionPool<T extends RedisConnection> {
checkForReconnect(entry);
}
releaseConnection(entry);
promise.tryFailure(cause);
}
private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
conn.closeAsync();
checkForReconnect(entry);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
} else {
conn.closeAsync();
}
releaseConnection(entry);
@ -267,9 +277,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
private RFuture<T> promiseFailure(ClientConnectionsEntry entry, T conn) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
conn.closeAsync();
checkForReconnect(entry);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
} else {
conn.closeAsync();
}
releaseConnection(entry);

@ -22,6 +22,12 @@ import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.ClientConnectionsEntry;
/**
* Connection pool for master node
*
* @author Nikita Koksharov
*
*/
public class MasterConnectionPool extends ConnectionPool<RedisConnection> {
public MasterConnectionPool(MasterSlaveServersConfig config,
ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {

@ -22,6 +22,12 @@ import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
/**
* Connection pool for Publish / Subscribe
*
* @author Nikita Koksharov
*
*/
public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection> {
public PubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {

@ -20,6 +20,12 @@ import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
/**
* Connection pool for Publish/Subscribe used with single node
*
* @author Nikita Koksharov
*
*/
public class SinglePubSubConnectionPool extends PubSubConnectionPool {
public SinglePubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager,

@ -21,6 +21,12 @@ import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
/**
* Connection pool for slave node
*
* @author Nikita Koksharov
*
*/
public class SlaveConnectionPool extends ConnectionPool<RedisConnection> {
public SlaveConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager,

@ -25,10 +25,11 @@ import org.redisson.liveobject.resolver.Resolver;
* @author Rui Gu (https://github.com/jackygurui)
*/
public class DefaultResolverProvider implements ResolverProvider {
public final ConcurrentMap<Class<? extends Resolver>, Resolver> providerCache = PlatformDependent.newConcurrentHashMap();
public transient final ConcurrentMap<Class<? extends Resolver>, Resolver<?, ?, ?>> providerCache = PlatformDependent.newConcurrentHashMap();
@Override
public Resolver getResolver(Class<?> cls, Class<? extends Resolver> resolverClass, Annotation anno) {
public Resolver<?, ?, ?> getResolver(Class<?> cls, Class<? extends Resolver> resolverClass, Annotation anno) {
if (!providerCache.containsKey(resolverClass)) {
try {
providerCache.putIfAbsent(resolverClass, resolverClass.newInstance());

@ -118,8 +118,8 @@ public class RedissonObjectFactory {
List<Class<?>> interfaces = Arrays.asList(type.getInterfaces());
for (Class<?> iType : interfaces) {
if (builders.containsKey(iType)) {// user cache to speed up things a little.
Method builder = builders.get(iType).get(rr.isDefaultCodec());
return (T) (rr.isDefaultCodec()
Method builder = builders.get(iType).get(isDefaultCodec(rr));
return (T) (isDefaultCodec(rr)
? builder.invoke(redisson, rr.getKeyName())
: builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType())));
}
@ -128,6 +128,11 @@ public class RedissonObjectFactory {
throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodecName());
}
public static boolean isDefaultCodec(RedissonReference rr) {
return rr.getCodec() == null;
}
public static <T> T fromReference(RedissonReactiveClient redisson, RedissonReference rr) throws Exception {
return fromReference(redisson, rr, null);
}
@ -142,8 +147,8 @@ public class RedissonObjectFactory {
List<Class<?>> interfaces = Arrays.asList(type.getInterfaces());
for (Class<?> iType : interfaces) {
if (builders.containsKey(iType)) {// user cache to speed up things a little.
Method builder = builders.get(iType).get(rr.isDefaultCodec());
return (T) (rr.isDefaultCodec()
Method builder = builders.get(iType).get(isDefaultCodec(rr));
return (T) (isDefaultCodec(rr)
? builder.invoke(redisson, rr.getKeyName())
: builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType())));
}

@ -36,7 +36,6 @@ import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
@ -74,7 +73,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
private static final RedisCommand<MapScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapScanResult<Object, Object>>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Boolean> EVAL_REMOVE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<List<Object>> EVAL_GET_TTL = new RedisCommand<List<Object>>("EVAL", new TTLMapValueReplayDecoder<Object>(), 5, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_KEY);
@ -225,7 +224,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
}
@Override
public Publisher<Long> remove(Object key, Object value) {
public Publisher<Boolean> remove(Object key, Object value) {
return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('zrem', KEYS[2], ARGV[1]); "

@ -78,6 +78,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
return reactive(instance.getAllAsync(keys));
}
@Override
public Publisher<Void> putAll(Map<? extends K, ? extends V> map) {
return reactive(instance.putAllAsync(map));
}
@ -88,7 +89,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
}
@Override
public Publisher<Long> remove(Object key, Object value) {
public Publisher<Boolean> remove(Object key, Object value) {
return reactive(instance.removeAsync(key, value));
}

@ -2,6 +2,9 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -9,9 +12,12 @@ import org.junit.Assert;
import org.junit.Test;
import org.redisson.RedissonLocalCachedMap.CacheKey;
import org.redisson.RedissonLocalCachedMap.CacheValue;
import org.redisson.RedissonMapTest.SimpleKey;
import org.redisson.RedissonMapTest.SimpleValue;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.misc.Cache;
import mockit.Deencapsulation;
@ -62,8 +68,8 @@ public class RedissonLocalCachedMapTest extends BaseTest {
map2.put("2", 4);
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(0);
assertThat(cache2.size()).isEqualTo(0);
assertThat(cache1.size()).isEqualTo(1);
assertThat(cache2.size()).isEqualTo(1);
}
@Test
@ -197,18 +203,267 @@ public class RedissonLocalCachedMapTest extends BaseTest {
@Test
public void testPut() {
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put("12", 1);
map.put("14", 2);
map.put("15", 3);
Deencapsulation.setField(map, "map", null);
assertThat(cache).containsValues(new CacheValue("12", 1), new CacheValue("12", 2), new CacheValue("15", 3));
assertThat(map.get("12")).isEqualTo(1);
assertThat(map.get("14")).isEqualTo(2);
assertThat(map.get("15")).isEqualTo(3);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
assertThat(map1.get("12")).isEqualTo(1);
assertThat(map1.get("14")).isEqualTo(2);
assertThat(map1.get("15")).isEqualTo(3);
}
@Test
public void testGetAll() {
RMap<String, Integer> map = redisson.getLocalCachedMap("getAll", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put("1", 100);
map.put("2", 200);
map.put("3", 300);
map.put("4", 400);
assertThat(cache.size()).isEqualTo(4);
Map<String, Integer> filtered = map.getAll(new HashSet<String>(Arrays.asList("2", "3", "5")));
Map<String, Integer> expectedMap = new HashMap<String, Integer>();
expectedMap.put("2", 200);
expectedMap.put("3", 300);
assertThat(filtered).isEqualTo(expectedMap);
RMap<String, Integer> map1 = redisson.getLocalCachedMap("getAll", LocalCachedMapOptions.defaults());
Map<String, Integer> filtered1 = map1.getAll(new HashSet<String>(Arrays.asList("2", "3", "5")));
assertThat(filtered1).isEqualTo(expectedMap);
}
@Test
public void testPutAll() {
Map<Integer, String> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Map<Integer, String> map1 = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
map.put(1, "1");
map.put(2, "2");
map.put(3, "3");
Map<Integer, String> joinMap = new HashMap<Integer, String>();
joinMap.put(4, "4");
joinMap.put(5, "5");
joinMap.put(6, "6");
map.putAll(joinMap);
assertThat(cache.size()).isEqualTo(6);
assertThat(cache1.size()).isEqualTo(0);
assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6);
map1.putAll(joinMap);
assertThat(cache.size()).isEqualTo(3);
assertThat(cache1.size()).isEqualTo(3);
}
@Test
public void testAddAndGet() throws InterruptedException {
RMap<Integer, Integer> map = redisson.getLocalCachedMap("getAll", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(1, 100);
Integer res = map.addAndGet(1, 12);
assertThat(cache.size()).isEqualTo(1);
assertThat(res).isEqualTo(112);
res = map.get(1);
assertThat(res).isEqualTo(112);
RMap<Integer, Double> map2 = redisson.getLocalCachedMap("getAll2", LocalCachedMapOptions.defaults());
map2.put(1, new Double(100.2));
Double res2 = map2.addAndGet(1, new Double(12.1));
assertThat(res2).isEqualTo(112.3);
res2 = map2.get(1);
assertThat(res2).isEqualTo(112.3);
RMap<String, Integer> mapStr = redisson.getLocalCachedMap("mapStr", LocalCachedMapOptions.defaults());
assertThat(mapStr.put("1", 100)).isNull();
assertThat(mapStr.addAndGet("1", 12)).isEqualTo(112);
assertThat(mapStr.get("1")).isEqualTo(112);
assertThat(cache.size()).isEqualTo(1);
}
@Test
public void testFastPutIfAbsent() throws Exception {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
SimpleKey key = new SimpleKey("1");
SimpleValue value = new SimpleValue("2");
map.put(key, value);
assertThat(map.fastPutIfAbsent(key, new SimpleValue("3"))).isFalse();
assertThat(cache.size()).isEqualTo(1);
assertThat(map.get(key)).isEqualTo(value);
SimpleKey key1 = new SimpleKey("2");
SimpleValue value1 = new SimpleValue("4");
assertThat(map.fastPutIfAbsent(key1, value1)).isTrue();
assertThat(cache.size()).isEqualTo(2);
assertThat(map.get(key1)).isEqualTo(value1);
}
@Test
public void testReadAllEntrySet() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(new SimpleKey("1"), new SimpleValue("2"));
map.put(new SimpleKey("33"), new SimpleValue("44"));
map.put(new SimpleKey("5"), new SimpleValue("6"));
assertThat(map.readAllEntrySet().size()).isEqualTo(3);
assertThat(cache.size()).isEqualTo(3);
Map<SimpleKey, SimpleValue> testMap = new HashMap<>(map);
assertThat(map.readAllEntrySet()).containsOnlyElementsOf(testMap.entrySet());
RMap<SimpleKey, SimpleValue> map2 = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults());
assertThat(map2.readAllEntrySet()).containsOnlyElementsOf(testMap.entrySet());
}
@Test
public void testPutIfAbsent() throws Exception {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
SimpleKey key = new SimpleKey("1");
SimpleValue value = new SimpleValue("2");
map.put(key, value);
Assert.assertEquals(value, map.putIfAbsent(key, new SimpleValue("3")));
Assert.assertEquals(value, map.get(key));
SimpleKey key1 = new SimpleKey("2");
SimpleValue value1 = new SimpleValue("4");
Assert.assertNull(map.putIfAbsent(key1, value1));
Assert.assertEquals(value1, map.get(key1));
assertThat(cache.size()).isEqualTo(2);
}
@Test
public void testRemoveValue() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(new SimpleKey("1"), new SimpleValue("2"));
boolean res = map.remove(new SimpleKey("1"), new SimpleValue("2"));
Assert.assertTrue(res);
SimpleValue val1 = map.get(new SimpleKey("1"));
Assert.assertNull(val1);
Assert.assertEquals(0, map.size());
assertThat(cache.size()).isEqualTo(0);
}
@Test
public void testRemoveValueFail() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(new SimpleKey("1"), new SimpleValue("2"));
boolean res = map.remove(new SimpleKey("2"), new SimpleValue("1"));
Assert.assertFalse(res);
boolean res1 = map.remove(new SimpleKey("1"), new SimpleValue("3"));
Assert.assertFalse(res1);
SimpleValue val1 = map.get(new SimpleKey("1"));
Assert.assertEquals("2", val1.getValue());
assertThat(cache.size()).isEqualTo(1);
}
@Test
public void testReplaceOldValueFail() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(new SimpleKey("1"), new SimpleValue("2"));
boolean res = map.replace(new SimpleKey("1"), new SimpleValue("43"), new SimpleValue("31"));
Assert.assertFalse(res);
SimpleValue val1 = map.get(new SimpleKey("1"));
Assert.assertEquals("2", val1.getValue());
assertThat(cache.size()).isEqualTo(1);
}
@Test
public void testReplaceOldValueSuccess() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(new SimpleKey("1"), new SimpleValue("2"));
boolean res = map.replace(new SimpleKey("1"), new SimpleValue("2"), new SimpleValue("3"));
Assert.assertTrue(res);
boolean res1 = map.replace(new SimpleKey("1"), new SimpleValue("2"), new SimpleValue("3"));
Assert.assertFalse(res1);
SimpleValue val1 = map.get(new SimpleKey("1"));
Assert.assertEquals("3", val1.getValue());
assertThat(cache.size()).isEqualTo(1);
}
@Test
public void testReplaceValue() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(new SimpleKey("1"), new SimpleValue("2"));
SimpleValue res = map.replace(new SimpleKey("1"), new SimpleValue("3"));
Assert.assertEquals("2", res.getValue());
assertThat(cache.size()).isEqualTo(1);
SimpleValue val1 = map.get(new SimpleKey("1"));
Assert.assertEquals("3", val1.getValue());
}
@Test
public void testReadAllValues() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(new SimpleKey("1"), new SimpleValue("2"));
map.put(new SimpleKey("33"), new SimpleValue("44"));
map.put(new SimpleKey("5"), new SimpleValue("6"));
assertThat(cache.size()).isEqualTo(3);
assertThat(map.readAllValues().size()).isEqualTo(3);
Map<SimpleKey, SimpleValue> testMap = new HashMap<>(map);
assertThat(map.readAllValues()).containsOnlyElementsOf(testMap.values());
RMap<SimpleKey, SimpleValue> map2 = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
assertThat(map2.readAllValues()).containsOnlyElementsOf(testMap.values());
}
@Test
public void testFastRemoveAsync() throws InterruptedException, ExecutionException {
RMap<Integer, Integer> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(1, 3);
map.put(3, 5);
map.put(4, 6);
map.put(7, 8);
assertThat(map.fastRemoveAsync(1, 3, 7).get()).isEqualTo(3);
assertThat(cache.size()).isEqualTo(1);
assertThat(map.size()).isEqualTo(1);
}
@Test
public void testRemove() {
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
@ -225,13 +480,14 @@ public class RedissonLocalCachedMapTest extends BaseTest {
}
@Test
public void testFastRemoveAsync() throws InterruptedException, ExecutionException {
public void testFastRemove() throws InterruptedException, ExecutionException {
RLocalCachedMap<Integer, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
map.put(1, 3);
map.put(2, 4);
map.put(7, 8);
assertThat(map.fastRemoveAsync(1).get()).isTrue();
assertThat(map.fastRemoveAsync(2).get()).isFalse();
assertThat(map.fastRemove(1, 2)).isEqualTo(2);
assertThat(map.fastRemove(2)).isEqualTo(0);
assertThat(map.size()).isEqualTo(1);
}

@ -278,8 +278,8 @@ public class RedissonMapCacheReactiveTest extends BaseReactiveTest {
RMapCacheReactive<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2"), 1, TimeUnit.SECONDS));
long res = sync(map.remove(new SimpleKey("1"), new SimpleValue("2")));
Assert.assertEquals(1, res);
boolean res = sync(map.remove(new SimpleKey("1"), new SimpleValue("2")));
Assert.assertTrue(res);
SimpleValue val1 = sync(map.get(new SimpleKey("1")));
Assert.assertNull(val1);
@ -373,9 +373,9 @@ public class RedissonMapCacheReactiveTest extends BaseReactiveTest {
@Test
public void testEmptyRemove() {
RMapCacheReactive<Integer, Integer> map = redisson.getMapCache("simple");
Assert.assertEquals(0, sync(map.remove(1, 3)).longValue());
assertThat(sync(map.remove(1, 3))).isEqualTo(0);
sync(map.put(4, 5));
Assert.assertEquals(1, sync(map.remove(4, 5)).longValue());
assertThat(sync(map.remove(4, 5))).isEqualTo(1);
}
@Test

@ -307,8 +307,8 @@ public class RedissonMapReactiveTest extends BaseReactiveTest {
RMapReactive<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
long size = sync(map.remove(new SimpleKey("1"), new SimpleValue("2")));
Assert.assertEquals(1, size);
boolean size = sync(map.remove(new SimpleKey("1"), new SimpleValue("2")));
Assert.assertTrue(size);
SimpleValue val1 = sync(map.get(new SimpleKey("1")));
Assert.assertNull(val1);
@ -321,11 +321,11 @@ public class RedissonMapReactiveTest extends BaseReactiveTest {
RMapReactive<SimpleKey, SimpleValue> map = redisson.getMap("simple");
sync(map.put(new SimpleKey("1"), new SimpleValue("2")));
long size = sync(map.remove(new SimpleKey("2"), new SimpleValue("1")));
Assert.assertEquals(0, size);
boolean removed = sync(map.remove(new SimpleKey("2"), new SimpleValue("1")));
Assert.assertFalse(removed);
long size2 = sync(map.remove(new SimpleKey("1"), new SimpleValue("3")));
Assert.assertEquals(0, size2);
boolean size2 = sync(map.remove(new SimpleKey("1"), new SimpleValue("3")));
Assert.assertFalse(size2);
SimpleValue val1 = sync(map.get(new SimpleKey("1")));
Assert.assertEquals("2", val1.getValue());
@ -444,9 +444,9 @@ public class RedissonMapReactiveTest extends BaseReactiveTest {
@Test
public void testEmptyRemove() {
RMapReactive<Integer, Integer> map = redisson.getMap("simple");
Assert.assertEquals(0, sync(map.remove(1, 3)).intValue());
assertThat(sync(map.remove(1, 3))).isFalse();
sync(map.put(4, 5));
Assert.assertEquals(1, sync(map.remove(4, 5)).intValue());
assertThat(sync(map.remove(4, 5))).isTrue();
}
@Test

Loading…
Cancel
Save