RLocalCachedMap implemented. #592

pull/605/head
Nikita 9 years ago
parent 3d4b5d799b
commit 5261be134e

@ -41,33 +41,33 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.0.40.Final</version>
<version>4.0.41.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.0.40.Final</version>
<version>4.0.41.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>4.0.40.Final</version>
<version>4.0.41.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.0.40.Final</version>
<version>4.0.41.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.0.40.Final</version>
<version>4.0.41.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.0.40.Final</version>
<version>4.0.41.Final</version>
</dependency>
<dependency>
@ -106,6 +106,12 @@
<version>1.7.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<version>1.27</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>

@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.api.ClusterNodesGroup;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.Node;
import org.redisson.api.NodesGroup;
import org.redisson.api.RAtomicDouble;
@ -42,6 +43,7 @@ import org.redisson.api.RList;
import org.redisson.api.RListMultimap;
import org.redisson.api.RListMultimapCache;
import org.redisson.api.RLiveObjectService;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
@ -216,6 +218,16 @@ public class Redisson implements RedissonClient {
return new RedissonListMultimap<K, V>(codec, commandExecutor, name);
}
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(this, commandExecutor, name, options);
}
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(this, codec, commandExecutor, name, options);
}
@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(commandExecutor, name);

@ -0,0 +1,690 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.AbstractCollection;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.Cache;
import org.redisson.misc.Hash;
import org.redisson.misc.LFUCacheMap;
import org.redisson.misc.LRUCacheMap;
import org.redisson.misc.NoneCacheMap;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonLocalCachedMap<K, V> extends RedissonExpirable implements RLocalCachedMap<K, V> {
public static class LocalCachedMapClear {
}
public static class LocalCachedMapInvalidate {
private byte[] keyHash;
public LocalCachedMapInvalidate() {
}
public LocalCachedMapInvalidate(byte[] keyHash) {
super();
this.keyHash = keyHash;
}
public byte[] getKeyHash() {
return keyHash;
}
}
public static class CacheKey {
private final byte[] keyHash;
public CacheKey(byte[] keyHash) {
super();
this.keyHash = keyHash;
}
public byte[] getKeyHash() {
return keyHash;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(keyHash);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CacheKey other = (CacheKey) obj;
if (!Arrays.equals(keyHash, other.keyHash))
return false;
return true;
}
@Override
public String toString() {
return "CacheKey [keyHash=" + Arrays.toString(keyHash) + "]";
}
}
public static class CacheValue {
private final Object key;
private final Object value;
public CacheValue(Object key, Object value) {
super();
this.key = key;
this.value = value;
}
public Object getKey() {
return key;
}
public Object getValue() {
return value;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CacheValue other = (CacheValue) obj;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
@Override
public String toString() {
return "CacheValue [key=" + key + ", value=" + value + "]";
}
}
private static final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private RTopic<Object> invalidationTopic;
private RMap<K, V> map;
private Cache<CacheKey, CacheValue> cache;
private int invalidateEntryOnChange;
protected RedissonLocalCachedMap(RedissonClient redisson, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options) {
super(commandExecutor, name);
init(redisson, name, options);
}
protected RedissonLocalCachedMap(RedissonClient redisson, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options) {
super(codec, connectionManager, name);
init(redisson, name, options);
}
private void init(RedissonClient redisson, String name, LocalCachedMapOptions options) {
map = redisson.getMap(name);
if (options.isInvalidateEntryOnChange()) {
invalidateEntryOnChange = 1;
}
if (options.getEvictionPolicy() == EvictionPolicy.NONE) {
cache = new NoneCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LRU) {
cache = new LRUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
cache = new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
invalidationTopic = redisson.getTopic(name + ":topic");
invalidationTopic.addListener(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
cache.clear();
}
if (msg instanceof LocalCachedMapInvalidate) {
CacheKey key = new CacheKey(((LocalCachedMapInvalidate)msg).getKeyHash());
cache.remove(key);
}
}
});
}
@Override
public int size() {
return get(sizeAsync());
}
@Override
public RFuture<Integer> sizeAsync() {
return map.sizeAsync();
}
@Override
public boolean isEmpty() {
return map.isEmpty();
}
@Override
public boolean containsKey(Object key) {
return get(containsKeyAsync(key));
}
private CacheKey toCacheKey(Object key) {
byte[] encoded = encodeMapKey(key);
return toCacheKey(encoded);
}
private CacheKey toCacheKey(byte[] encodedKey) {
return new CacheKey(Hash.hash(encodedKey));
}
@Override
public RFuture<Boolean> containsKeyAsync(Object key) {
CacheKey cacheKey = toCacheKey(key);
if (!cache.containsKey(cacheKey)) {
return map.containsKeyAsync(key);
}
return newSucceededFuture(true);
}
@Override
public boolean containsValue(Object value) {
return get(containsValueAsync(value));
}
@Override
public RFuture<Boolean> containsValueAsync(Object value) {
CacheValue cacheValue = new CacheValue(null, value);
if (!cache.containsValue(cacheValue)) {
return map.containsValueAsync(value);
}
return newSucceededFuture(true);
}
@Override
public V get(Object key) {
return get(getAsync(key));
}
@Override
public RFuture<V> getAsync(final Object key) {
if (key == null) {
throw new NullPointerException();
}
final CacheKey cacheKey = toCacheKey(key);
CacheValue cacheValue = cache.get(cacheKey);
if (cacheValue != null && cacheValue.getValue() != null) {
return newSucceededFuture((V)cacheValue.getValue());
}
RFuture<V> future = map.getAsync((K)key);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
return;
}
V value = future.getNow();
if (value != null) {
cache.put(cacheKey, new CacheValue(key, value));
}
}
});
return future;
}
@Override
public V put(K key, V value) {
return get(putAsync(key, value));
}
@Override
public RFuture<V> putAsync(K key, V value) {
if (key == null) {
throw new NullPointerException();
}
if (value == null) {
throw new NullPointerException();
}
byte[] mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] msg = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 and ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end; "
+ "return v; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
mapKey, encodeMapValue(value), msg, invalidateEntryOnChange);
}
@Override
public boolean fastPut(K key, V value) {
return get(fastPutAsync(key, value));
}
@Override
public RFuture<Boolean> fastPutAsync(K key, V value) {
if (key == null) {
throw new NullPointerException();
}
if (value == null) {
throw new NullPointerException();
}
byte[] encodedKey = encodeMapKey(key);
byte[] encodedValue = encodeMapKey(value);
CacheKey cacheKey = toCacheKey(encodedKey);
byte[] msg = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "return 0; "
+ "end; "
+ "return 1; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
encodedKey, encodedValue, msg, invalidateEntryOnChange);
}
@Override
public V remove(Object key) {
return get(removeAsync((K)key));
}
@Override
public RFuture<V> removeAsync(K key) {
if (key == null) {
throw new NullPointerException();
}
byte[] keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
cache.remove(cacheKey);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 and ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[2]); "
+ "end; "
+ "return v",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
keyEncoded, msgEncoded, invalidateEntryOnChange);
}
@Override
public boolean fastRemove(Object key) {
return get(fastRemoveAsync((K)key));
}
@Override
public RFuture<Boolean> fastRemoveAsync(K key) {
if (key == null) {
throw new NullPointerException();
}
byte[] keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
cache.remove(cacheKey);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[2]); "
+ "end; "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
keyEncoded, msgEncoded, invalidateEntryOnChange);
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
Map<CacheKey, CacheValue> cacheMap = new HashMap<CacheKey, CacheValue>(m.size());
for (java.util.Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
CacheKey cacheKey = toCacheKey(entry.getKey());
CacheValue cacheValue = new CacheValue(entry.getKey(), entry.getValue());
cacheMap.put(cacheKey, cacheValue);
}
cache.putAll(cacheMap);
map.putAll(m);
for (CacheKey cacheKey : cacheMap.keySet()) {
invalidationTopic.publish(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
}
}
@Override
public void clear() {
delete();
}
@Override
public RFuture<Boolean> deleteAsync() {
cache.clear();
byte[] msgEncoded = encode(new LocalCachedMapClear());
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1]) == 1 and ARGV[2] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "end; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
msgEncoded, invalidateEntryOnChange);
}
@Override
public Set<K> keySet() {
return new KeySet();
}
@Override
public Collection<V> values() {
return new Values();
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
return new EntrySet();
}
private Iterator<Map.Entry<K,V>> cacheEntrySetIterator() {
final Iterator<Map.Entry<CacheKey, CacheValue>> iter = cache.entrySet().iterator();
return new Iterator<Map.Entry<K,V>>() {
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public java.util.Map.Entry<K, V> next() {
Map.Entry<CacheKey, CacheValue> entry = iter.next();
return new AbstractMap.SimpleEntry(entry.getValue().getKey(), entry.getValue().getValue());
}
@Override
public void remove() {
iter.remove();
}
};
}
private Iterator<K> cacheKeySetIterator() {
final Iterator<CacheValue> iter = cache.values().iterator();
return new Iterator<K>() {
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public K next() {
CacheValue value = iter.next();
return (K) value.getKey();
}
@Override
public void remove() {
iter.remove();
}
};
}
final class KeySet extends AbstractSet<K> {
@Override
public Iterator<K> iterator() {
return new CompositeIterable<K>(cacheKeySetIterator(), map.keySet().iterator()) {
@Override
boolean isCacheContains(Object object) {
CacheKey cacheKey = toCacheKey(object);
return cache.containsKey(cacheKey);
}
};
}
@Override
public boolean contains(Object o) {
return RedissonLocalCachedMap.this.containsKey(o);
}
@Override
public boolean remove(Object o) {
return RedissonLocalCachedMap.this.remove(o) != null;
}
@Override
public int size() {
return RedissonLocalCachedMap.this.size();
}
@Override
public void clear() {
RedissonLocalCachedMap.this.clear();
}
}
final class Values extends AbstractCollection<V> {
@Override
public Iterator<V> iterator() {
final Iterator<Map.Entry<K, V>> iter = RedissonLocalCachedMap.this.entrySet().iterator();
return new Iterator<V>() {
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public V next() {
return iter.next().getValue();
}
@Override
public void remove() {
iter.remove();
}
};
}
@Override
public boolean contains(Object o) {
return RedissonLocalCachedMap.this.containsValue(o);
}
@Override
public int size() {
return RedissonLocalCachedMap.this.size();
}
@Override
public void clear() {
RedissonLocalCachedMap.this.clear();
}
}
final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
public final Iterator<Map.Entry<K,V>> iterator() {
return new CompositeIterable<Map.Entry<K,V>>(cacheEntrySetIterator(), map.entrySet().iterator()) {
@Override
boolean isCacheContains(Map.Entry<K,V> entry) {
CacheKey cacheKey = toCacheKey(entry.getKey());
return cache.containsKey(cacheKey);
}
};
}
public final boolean contains(Object o) {
if (!(o instanceof Map.Entry))
return false;
Map.Entry<?,?> e = (Map.Entry<?,?>) o;
Object key = e.getKey();
V value = get(key);
return value != null && value.equals(e);
}
public final boolean remove(Object o) {
if (o instanceof Map.Entry) {
Map.Entry<?,?> e = (Map.Entry<?,?>) o;
Object key = e.getKey();
Object value = e.getValue();
return RedissonLocalCachedMap.this.map.remove(key, value);
}
return false;
}
public final int size() {
return RedissonLocalCachedMap.this.size();
}
public final void clear() {
RedissonLocalCachedMap.this.clear();
}
}
abstract class CompositeIterable<T> implements Iterator<T> {
private T currentObject;
private Iterator<T> cacheIterator;
private Iterator<T> mapIterator;
public CompositeIterable(Iterator<T> cacheIterator, Iterator<T> mapIterator) {
this.cacheIterator = cacheIterator;
this.mapIterator = mapIterator;
}
@Override
public boolean hasNext() {
if (!cacheIterator.hasNext()) {
while (true) {
if (mapIterator.hasNext()) {
currentObject = mapIterator.next();
if (!isCacheContains(currentObject)) {
return true;
}
} else {
break;
}
}
return false;
}
return true;
}
abstract boolean isCacheContains(T object);
@Override
public T next() {
if (currentObject != null) {
T val = currentObject;
currentObject = null;
return val;
}
if (!hasNext()) {
throw new NoSuchElementException();
}
return cacheIterator.next();
}
@Override
public void remove() {
if (currentObject != null) {
mapIterator.remove();
currentObject = null;
return;
}
cacheIterator.remove();
}
}
}

@ -140,5 +140,21 @@ abstract class RedissonObject implements RObject {
throw new IllegalArgumentException(e);
}
}
protected byte[] encodeMapKey(Object value) {
try {
return codec.getMapKeyEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
protected byte[] encodeMapValue(Object value) {
try {
return codec.getMapValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}

@ -0,0 +1,174 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
/**
* RLocalCachedMap options object. Used to specify RLocalCachedMap settings.
*
* @author Nikita Koksharov
*
*/
public class LocalCachedMapOptions {
public enum EvictionPolicy {NONE, LRU, LFU};
private boolean invalidateEntryOnChange;
private EvictionPolicy evictionPolicy;
private int cacheSize;
private long timeToLiveInMillis;
private long maxIdleInMillis;
private LocalCachedMapOptions() {
}
protected LocalCachedMapOptions(LocalCachedMapOptions copy) {
this.invalidateEntryOnChange = copy.invalidateEntryOnChange;
this.evictionPolicy = copy.evictionPolicy;
this.cacheSize = copy.cacheSize;
this.timeToLiveInMillis = copy.timeToLiveInMillis;
this.maxIdleInMillis = copy.maxIdleInMillis;
}
/**
* Creates a new instance of LocalCachedMapOptions with default options.
* <p/>
* This is equivalent to:
* <pre>
* new LocalCachedMapOptions()
* .cacheSize(0).timeToLive(0).maxIdle(0)
* .evictionPolicy(EvictionPolicy.NONE)
* .invalidateEntryOnChange(true);
* </pre>
*/
public static LocalCachedMapOptions defaults() {
return new LocalCachedMapOptions()
.cacheSize(0).timeToLive(0).maxIdle(0)
.evictionPolicy(EvictionPolicy.NONE)
.invalidateEntryOnChange(true);
}
public boolean isInvalidateEntryOnChange() {
return invalidateEntryOnChange;
}
public EvictionPolicy getEvictionPolicy() {
return evictionPolicy;
}
public int getCacheSize() {
return cacheSize;
}
public long getTimeToLiveInMillis() {
return timeToLiveInMillis;
}
public long getMaxIdleInMillis() {
return maxIdleInMillis;
}
/**
* Sets cache size. If size is <code>0</code> then cache is unbounded.
*
* @param cacheSize
* @return
*/
public LocalCachedMapOptions cacheSize(int cacheSize) {
this.cacheSize = cacheSize;
return this;
}
/**
* Sets entry invalidation behavior.
*
* @param value - if <code>true</code> then invalidation message which removes corresponding entry from cache
* will be sent to all other RLocalCachedMap instances on each entry update/remove operation.
* if <code>false</code> then invalidation message won't be sent
* @return
*/
public LocalCachedMapOptions invalidateEntryOnChange(boolean value) {
this.invalidateEntryOnChange = value;
return this;
}
/**
* Sets eviction policy.
*
* @param evictionPolicy
* <p><code>LRU</code> - uses cache with LRU (least recently used) eviction policy.
* <p><code>LFU</code> - uses cache with LFU (least frequently used) eviction policy.
* <p><code>NONE</code> - doesn't use eviction policy, but timeToLive and maxIdleTime params are still working.
* @return
*/
public LocalCachedMapOptions evictionPolicy(EvictionPolicy evictionPolicy) {
if (evictionPolicy == null) {
throw new NullPointerException("evictionPolicy can't be null");
}
this.evictionPolicy = evictionPolicy;
return this;
}
/**
* Sets time to live in milliseconds for each map entry in cache.
* If value equals to <code>0<code> then timeout is not applied
*
* @param timeToLiveInMillis
* @return
*/
public LocalCachedMapOptions timeToLive(long timeToLiveInMillis) {
this.timeToLiveInMillis = timeToLiveInMillis;
return this;
}
/**
* Sets time to live for each map entry in cache.
* If value equals to <code>0<code> then timeout is not applied
*
* @param timeToLive
* @param timeUnit
* @return
*/
public LocalCachedMapOptions timeToLive(long timeToLive, TimeUnit timeUnit) {
return timeToLive(timeUnit.toMillis(timeToLive));
}
/**
* Sets max idle time in milliseconds for each map entry in cache.
* If value equals to <code>0<code> then timeout is not applied
*
* @param maxIdleInMillis
* @return
*/
public LocalCachedMapOptions maxIdle(long maxIdleInMillis) {
this.maxIdleInMillis = maxIdleInMillis;
return this;
}
/**
* Sets max idle time for each map entry in cache.
* If value equals to <code>0<code> then timeout is not applied
*
* @param maxIdleInMillis
* @return
*/
public LocalCachedMapOptions maxIdle(long maxIdle, TimeUnit timeUnit) {
return timeToLive(timeUnit.toMillis(maxIdle));
}
}

@ -0,0 +1,58 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Map;
/**
* Map object with entry cache support.
* <p>
* Each instance maintains local cache to achieve fast read operations.
* Suitable for maps which used mostly for read operations and network roundtrip delays are undesirable.
*
* @author Nikita Koksharov
*
* @param <K>
* @param <V>
*/
public interface RLocalCachedMap<K, V> extends Map<K, V>, RExpirable, RLocalCachedMapAsync<K, V> {
/**
* Associates the specified <code>value</code> with the specified <code>key</code>.
* <p>
* Works faster than <code>RLocalCachedMap.put</code> but not returning
* the previous value associated with <code>key</code>
*
* @param key
* @param value
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
boolean fastPut(K key, V value);
/**
* Removes <code>key</code> from map
* <p>
* Works faster than <code>RLocalCachedMap.remove</code> but not returning
* the value associated with <code>key</code>
*
* @param key
* @return <code>true</code> if key has been deleted.
* <code>false</code> if key doesn't exist.
*/
boolean fastRemove(Object key);
}

@ -0,0 +1,100 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
*
* @author Nikita Koksharov
*
* @param <K>
* @param <V>
*/
public interface RLocalCachedMapAsync<K, V> extends RExpirableAsync {
/**
* Returns map size
*
* @return
*/
RFuture<Integer> sizeAsync();
/**
* Checks if map contains the specified <code>key</code>
*
* @return <code>true</code> if map contains <code>key</code>.
* <code>false</code> if map doesn't contain <code>key</code>.
*/
RFuture<Boolean> containsKeyAsync(Object key);
/**
* Checks if map contains the specified <code>value</code>
*
* @return <code>true</code> if map contains <code>value</code>.
* <code>false</code> if map doesn't contain <code>value</code>.
*/
RFuture<Boolean> containsValueAsync(Object value);
/**
* Returns value associated with <code>key</code>
*
* @param key
* @return
*/
RFuture<V> getAsync(Object key);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>.
*
* @param key
* @param value
* @return previous value associated with <code>key</code>
*/
RFuture<V> putAsync(K key, V value);
/**
* Removes <code>key</code> from map.
*
* @param key
* @return removed value associated with <code>key</code>
*/
RFuture<V> removeAsync(K key);
/**
* Removes <code>key</code> from map
* <p>
* Works faster than <code>RLocalCachedMap.remove</code> but not returning
* the value associated with <code>key</code>
*
* @param key
* @return <code>true</code> if key has been deleted.
* <code>false</code> if key doesn't exist.
*/
RFuture<Boolean> fastRemoveAsync(K key);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>.
* <p>
* Works faster than <code>RLocalCachedMap.put</code> but not returning
* the previous value associated with <code>key</code>
*
* @param key
* @param value
* @return <code>true</code> if key is a new key in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
RFuture<Boolean> fastPutAsync(K key, V value);
}

@ -207,6 +207,27 @@ public interface RedissonClient {
*/
<K, V> RListMultimapCache<K, V> getListMultimapCache(String name, Codec codec);
/**
* Returns local cached map instance by name.
* Configured by parameters of options-object.
*
* @param name
* @param options
* @return
*/
<K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, LocalCachedMapOptions options);
/**
* Returns local cached map instance by name
* using provided codec. Configured by parameters of options-object.
*
* @param name
* @param codec
* @param options
* @return
*/
<K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions options);
/**
* Returns map instance by name.
*

@ -25,6 +25,19 @@ public class Hash {
private Hash() {
}
public static byte[] hash(byte[] objectState) {
long h1 = LongHashFunction.farmUo().hashBytes(objectState);
long h2 = LongHashFunction.xx_r39().hashBytes(objectState);
ByteBuf buf = Unpooled.buffer((2 * Long.SIZE) / Byte.SIZE).writeLong(h1).writeLong(h2);
try {
return buf.array();
} finally {
buf.release();
}
}
public static String hashToBase64(byte[] objectState) {
long h1 = LongHashFunction.farmUo().hashBytes(objectState);

@ -0,0 +1,249 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.RedissonLocalCachedMap.CacheKey;
import org.redisson.RedissonLocalCachedMap.CacheValue;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.RLocalCachedMap;
import org.redisson.misc.Cache;
import mockit.Deencapsulation;
public class RedissonLocalCachedMapTest extends BaseTest {
// @Test
public void testPerf() {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(100000).invalidateEntryOnChange(true);
Map<String, Integer> map = redisson.getLocalCachedMap("test", options);
// Map<String, Integer> map = redisson.getMap("test");
for (int i = 0; i < 100000; i++) {
map.put("" + i, i);
}
long s = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
for (int j = 0; j < 100000; j++) {
map.get("" + j);
}
}
System.out.println(System.currentTimeMillis() - s);
}
@Test
public void testInvalidationOnUpdate() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(true);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
RLocalCachedMap<String, Integer> map2 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", 1);
map1.put("2", 2);
assertThat(map2.get("1")).isEqualTo(1);
assertThat(map2.get("2")).isEqualTo(2);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
map1.put("1", 3);
map2.put("2", 4);
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(0);
assertThat(cache2.size()).isEqualTo(0);
}
@Test
public void testNoInvalidationOnUpdate() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(false);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
RLocalCachedMap<String, Integer> map2 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", 1);
map1.put("2", 2);
assertThat(map2.get("1")).isEqualTo(1);
assertThat(map2.get("2")).isEqualTo(2);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
map1.put("1", 3);
map2.put("2", 4);
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
}
@Test
public void testNoInvalidationOnRemove() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(false);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
RLocalCachedMap<String, Integer> map2 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", 1);
map1.put("2", 2);
assertThat(map2.get("1")).isEqualTo(1);
assertThat(map2.get("2")).isEqualTo(2);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
map1.remove("1");
map2.remove("2");
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(1);
assertThat(cache2.size()).isEqualTo(1);
}
@Test
public void testInvalidationOnRemove() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(true);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
RLocalCachedMap<String, Integer> map2 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", 1);
map1.put("2", 2);
assertThat(map2.get("1")).isEqualTo(1);
assertThat(map2.get("2")).isEqualTo(2);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
map1.remove("1");
map2.remove("2");
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(0);
assertThat(cache2.size()).isEqualTo(0);
}
@Test
public void testLFU() {
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5));
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put("12", 1);
map.put("14", 2);
map.put("15", 3);
map.put("16", 4);
map.put("17", 5);
map.put("18", 6);
assertThat(cache.size()).isEqualTo(5);
assertThat(map.size()).isEqualTo(6);
assertThat(map.keySet()).containsOnly("12", "14", "15", "16", "17", "18");
assertThat(map.values()).containsOnly(1, 2, 3, 4, 5, 6);
}
@Test
public void testLRU() {
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LRU).cacheSize(5));
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put("12", 1);
map.put("14", 2);
map.put("15", 3);
map.put("16", 4);
map.put("17", 5);
map.put("18", 6);
assertThat(cache.size()).isEqualTo(5);
assertThat(map.size()).isEqualTo(6);
assertThat(map.keySet()).containsOnly("12", "14", "15", "16", "17", "18");
assertThat(map.values()).containsOnly(1, 2, 3, 4, 5, 6);
}
@Test
public void testSize() {
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put("12", 1);
map.put("14", 2);
map.put("15", 3);
assertThat(cache.size()).isEqualTo(3);
assertThat(map.size()).isEqualTo(3);
}
@Test
public void testPut() {
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
map.put("12", 1);
map.put("14", 2);
map.put("15", 3);
Deencapsulation.setField(map, "map", null);
assertThat(map.get("12")).isEqualTo(1);
assertThat(map.get("14")).isEqualTo(2);
assertThat(map.get("15")).isEqualTo(3);
}
@Test
public void testRemove() {
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put("12", 1);
assertThat(cache.size()).isEqualTo(1);
assertThat(map.remove("12")).isEqualTo(1);
assertThat(cache.size()).isEqualTo(0);
assertThat(map.remove("14")).isNull();
}
@Test
public void testFastRemoveAsync() throws InterruptedException, ExecutionException {
RLocalCachedMap<Integer, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
map.put(1, 3);
map.put(7, 8);
assertThat(map.fastRemoveAsync(1).get()).isTrue();
assertThat(map.fastRemoveAsync(2).get()).isFalse();
assertThat(map.size()).isEqualTo(1);
}
@Test
public void testFastPut() {
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
Assert.assertTrue(map.fastPut("1", 2));
assertThat(map.get("1")).isEqualTo(2);
Assert.assertFalse(map.fastPut("1", 3));
assertThat(map.get("1")).isEqualTo(3);
Assert.assertEquals(1, map.size());
}
}
Loading…
Cancel
Save