refactoring

pull/968/head
Nikita 8 years ago
parent dde5ac16ba
commit b0150c14fe

@ -45,6 +45,8 @@ import org.redisson.api.listener.MessageListener;
import org.redisson.cache.Cache;
import org.redisson.cache.LFUCacheMap;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.LocalCachedMapClear;
import org.redisson.cache.LocalCachedMapInvalidate;
import org.redisson.cache.NoneCacheMap;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.codec.ByteArrayCodec;
@ -79,34 +81,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private static final Logger log = LoggerFactory.getLogger(RedissonLocalCachedMap.class);
public static class LocalCachedMapClear implements Serializable {
}
public static class LocalCachedMapInvalidate implements Serializable {
private byte[] excludedId;
private byte[][] keyHashes;
public LocalCachedMapInvalidate() {
}
public LocalCachedMapInvalidate(byte[] excludedId, byte[]... keyHashes) {
super();
this.keyHashes = keyHashes;
this.excludedId = excludedId;
}
public byte[] getExcludedId() {
return excludedId;
}
public byte[][] getKeyHashes() {
return keyHashes;
}
}
public static class CacheKey implements Serializable {
private final byte[] keyHash;
@ -344,6 +318,8 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public RFuture<Boolean> containsKeyAsync(Object key) {
checkKey(key);
CacheKey cacheKey = toCacheKey(key);
if (!cache.containsKey(cacheKey)) {
return super.containsKeyAsync(key);
@ -353,6 +329,8 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public RFuture<Boolean> containsValueAsync(Object value) {
checkValue(value);
CacheValue cacheValue = new CacheValue(null, value);
if (!cache.containsValue(cacheValue)) {
return super.containsValueAsync(value);
@ -362,9 +340,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public RFuture<V> getAsync(final Object key) {
if (key == null) {
throw new NullPointerException();
}
checkKey(key);
final CacheKey cacheKey = toCacheKey(key);
CacheValue cacheValue = cache.get(cacheKey);
@ -414,7 +390,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public RFuture<V> putOperationAsync(K key, V value) {
protected RFuture<V> putOperationAsync(K key, V value) {
byte[] mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
@ -424,13 +400,13 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "end; "
+ "return v; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
@ -1058,7 +1034,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
});
return future;
}
@Override
public RFuture<Collection<V>> readAllValuesAsync() {
final List<V> result = new ArrayList<V>();
@ -1315,11 +1291,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "end;"
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "
+ "else "
+ "return 0 "

@ -202,7 +202,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return newSucceededFuture(Collections.<K, V>emptyMap());
}
RFuture<Map<K, V>> future = getAllValuesAsync(keys);
RFuture<Map<K, V>> future = getAllOperationAsync(keys);
if (hasNoLoader()) {
return future;
}
@ -238,7 +238,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return options == null || options.getLoader() == null;
}
protected RFuture<Map<K, V>> getAllValuesAsync(final Set<K> keys) {
protected RFuture<Map<K, V>> getAllOperationAsync(final Set<K> keys) {
List<Object> args = new ArrayList<Object>(keys.size() + 1);
args.add(getName());
args.addAll(keys);

@ -97,8 +97,6 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
private int cacheSize;
private long timeToLiveInMillis;
private long maxIdleInMillis;
private MapLoader<K, V> mapLoader;
private MapWriter<K, V> mapWriter;
private LocalCachedMapOptions() {
}

@ -93,7 +93,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
if (entry == null) {
return false;
}
if (entry.isExpired()) {
if (isValueExpired(entry)) {
if (map.remove(key, entry)) {
onValueRemove(entry);
return false;
@ -103,6 +103,18 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
return true;
}
private boolean isValueExpired(CachedValue<K, V> entry) {
if (entry.isExpired()) {
return true;
}
if (entry.getValue() instanceof ExpirableValue) {
if (((ExpirableValue) entry.getValue()).isExpired()) {
return true;
}
}
return false;
}
/*
* (non-Javadoc)
* @see java.util.Map#containsValue(java.lang.Object)
@ -116,7 +128,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
for (Map.Entry<K, CachedValue<K, V>> entry : map.entrySet()) {
CachedValue<K, V> cachedValue = entry.getValue();
if (cachedValue.getValue().equals(value)) {
if (cachedValue.isExpired()) {
if (isValueExpired(cachedValue)) {
if (map.remove(cachedValue.getKey(), cachedValue)) {
onValueRemove(cachedValue);
}
@ -143,7 +155,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
if (entry == null) {
return null;
}
if (entry.isExpired()) {
if (isValueExpired(entry)) {
if (map.remove(key, entry)) {
onValueRemove(entry);
return null;
@ -179,7 +191,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
CachedValue<K, V> prevCachedValue = map.put(key, entry);
if (prevCachedValue != null) {
onValueRemove(prevCachedValue);
if (!prevCachedValue.isExpired()) {
if (!isValueExpired(prevCachedValue)) {
return (V) prevCachedValue.getValue();
}
}
@ -197,7 +209,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
boolean removed = false;
// TODO optimize
for (CachedValue<K, V> value : map.values()) {
if (value.isExpired()) {
if (isValueExpired(value)) {
if (map.remove(value.getKey(), value)) {
onValueRemove(value);
removed = true;
@ -235,7 +247,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
CachedValue<K, V> entry = map.remove(key);
if (entry != null) {
onValueRemove(entry);
if (!entry.isExpired()) {
if (!isValueExpired(entry)) {
return (V) entry.getValue();
}
}
@ -307,7 +319,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
mapEntry = null;
while (keyIterator.hasNext()) {
Map.Entry<K, CachedValue<K, V>> entry = keyIterator.next();
if (entry.getValue().isExpired()) {
if (isValueExpired(entry.getValue())) {
continue;
}
mapEntry = entry;

@ -18,8 +18,7 @@ package org.redisson.cache;
/**
* Created by jribble on 2/20/17.
*/
public interface CachedValue<K, V> {
boolean isExpired();
public interface CachedValue<K, V> extends ExpirableValue {
K getKey();

@ -0,0 +1,27 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
/**
*
* @author Nikita Koksharov
*
*/
public interface ExpirableValue {
boolean isExpired();
}

@ -0,0 +1,28 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
@SuppressWarnings("serial")
public class LocalCachedMapClear implements Serializable {
}

@ -0,0 +1,48 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
@SuppressWarnings("serial")
public class LocalCachedMapInvalidate implements Serializable {
private byte[] excludedId;
private byte[][] keyHashes;
public LocalCachedMapInvalidate() {
}
public LocalCachedMapInvalidate(byte[] excludedId, byte[]... keyHashes) {
super();
this.keyHashes = keyHashes;
this.excludedId = excludedId;
}
public byte[] getExcludedId() {
return excludedId;
}
public byte[][] getKeyHashes() {
return keyHashes;
}
}

@ -35,21 +35,26 @@ public class StdCachedValue<K, V> implements CachedValue<K, V> {
this.ttl = ttl;
this.key = key;
this.maxIdleTime = maxIdleTime;
creationTime = System.currentTimeMillis();
lastAccess = creationTime;
if (ttl != 0 || maxIdleTime != 0) {
creationTime = System.currentTimeMillis();
lastAccess = creationTime;
}
}
@Override
public boolean isExpired() {
boolean result = false;
if (maxIdleTime == 0 && ttl == 0) {
return false;
}
long currentTime = System.currentTimeMillis();
if (ttl != 0 && creationTime + ttl < currentTime) {
result = true;
return true;
}
if (maxIdleTime != 0 && lastAccess + maxIdleTime < currentTime) {
result = true;
return true;
}
return result;
return false;
}
@Override

@ -8,6 +8,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.junit.Assert;
import org.junit.Test;
@ -26,7 +27,38 @@ import mockit.Deencapsulation;
public class RedissonLocalCachedMapTest extends BaseMapTest {
@Test
public abstract class InvalidationTest {
RLocalCachedMap<String, Integer> map1;
RLocalCachedMap<String, Integer> map2;
Cache<CacheKey, CacheValue> cache1;
Cache<CacheKey, CacheValue> cache2;
public void execute() throws InterruptedException {
LocalCachedMapOptions<String, Integer> options = LocalCachedMapOptions.<String, Integer>defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5);
map1 = redisson.getLocalCachedMap("test", options);
cache1 = Deencapsulation.getField(map1, "cache");
map2 = redisson.getLocalCachedMap("test", options);
cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", 1);
map1.put("2", 2);
assertThat(map2.get("1")).isEqualTo(1);
assertThat(map2.get("2")).isEqualTo(2);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
test();
}
public abstract void test() throws InterruptedException;
}
// @Test
public void testPerf() {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.NONE).cacheSize(100000).invalidateEntryOnChange(true);
Map<String, Integer> map = redisson.getLocalCachedMap("test", options);
@ -102,68 +134,34 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
@Test
public void testInvalidationOnClear() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
.evictionPolicy(EvictionPolicy.LFU)
.cacheSize(5);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
RLocalCachedMap<String, Integer> map2 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", 1);
map1.put("2", 2);
map2.put("3", 2);
map2.put("4", 2);
assertThat(map1.size()).isEqualTo(4);
assertThat(map2.size()).isEqualTo(4);
assertThat(map1.readAllEntrySet()).hasSize(4);
assertThat(map2.readAllEntrySet()).hasSize(4);
assertThat(cache1.size()).isEqualTo(4);
assertThat(cache2.size()).isEqualTo(4);
map1.clear();
Thread.sleep(50);
assertThat(cache1.size()).isZero();
assertThat(cache2.size()).isZero();
assertThat(map1.size()).isZero();
assertThat(map2.size()).isZero();
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
map1.clear();
Thread.sleep(50);
assertThat(cache1.size()).isZero();
assertThat(cache2.size()).isZero();
assertThat(map1.size()).isZero();
assertThat(map2.size()).isZero();
}
}.execute();
}
@Test
public void testInvalidationOnUpdate() throws InterruptedException {
LocalCachedMapOptions<String, Integer> options = LocalCachedMapOptions.<String, Integer>defaults()
.evictionPolicy(EvictionPolicy.LFU)
.cacheSize(5);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
RLocalCachedMap<String, Integer> map2 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", 1);
map1.put("2", 2);
assertThat(map2.get("1")).isEqualTo(1);
assertThat(map2.get("2")).isEqualTo(2);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
map1.put("1", 3);
map2.put("2", 4);
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(1);
assertThat(cache2.size()).isEqualTo(1);
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
map1.put("1", 3);
map2.put("2", 4);
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(1);
assertThat(cache2.size()).isEqualTo(1);
}
}.execute();
}
@Test
@ -191,7 +189,7 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
map1.put("1", 3);
map2.put("2", 4);
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
}
@ -225,31 +223,20 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(cache1.size()).isEqualTo(1);
assertThat(cache2.size()).isEqualTo(1);
}
@Test
public void testInvalidationOnRemove() throws InterruptedException {
LocalCachedMapOptions<String, Integer> options = LocalCachedMapOptions.<String, Integer>defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
RLocalCachedMap<String, Integer> map2 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", 1);
map1.put("2", 2);
assertThat(map2.get("1")).isEqualTo(1);
assertThat(map2.get("2")).isEqualTo(2);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
map1.remove("1");
map2.remove("2");
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(0);
assertThat(cache2.size()).isEqualTo(0);
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
map1.remove("1");
map2.remove("2");
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(0);
assertThat(cache2.size()).isEqualTo(0);
}
}.execute();
}
@Test
@ -301,6 +288,21 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(map.size()).isEqualTo(3);
}
@Test
public void testInvalidationOnPut() throws InterruptedException {
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
map1.put("1", 10);
map1.put("2", 20);
Thread.sleep(50);
assertThat(cache1).hasSize(2);
assertThat(cache2).isEmpty();
}
}.execute();
}
@Test
public void testPut() {
@ -347,6 +349,25 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(filtered1).isEqualTo(expectedMap);
}
@Test
public void testInvalidationOnPutAll() throws InterruptedException {
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
Map<String, Integer> entries = new HashMap<>();
entries.put("1", 10);
entries.put("2", 20);
map1.putAll(entries);
Thread.sleep(50);
assertThat(cache1).hasSize(2);
assertThat(cache2).isEmpty();
}
}.execute();
}
@Test
public void testPutAll() throws InterruptedException {
Map<Integer, String> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
@ -458,6 +479,23 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(cache.size()).isEqualTo(2);
}
@Test
public void testInvalidationOnRemoveValue() throws InterruptedException {
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
map1.remove("1", 1);
map1.remove("2", 2);
Thread.sleep(50);
assertThat(cache1).isEmpty();
assertThat(cache2).isEmpty();
}
}.execute();
}
@Test
public void testRemoveValue() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults());
@ -491,6 +529,23 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(cache.size()).isEqualTo(1);
}
@Test
public void testInvalidationOnReplaceOldValue() throws InterruptedException {
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
map1.replace("1", 1, 10);
map1.replace("2", 2, 20);
Thread.sleep(50);
assertThat(cache1).hasSize(2);
assertThat(cache2).isEmpty();
}
}.execute();
}
@Test
public void testReplaceOldValueFail() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
@ -522,6 +577,22 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(cache.size()).isEqualTo(1);
}
@Test
public void testInvalidationOnReplaceValue() throws InterruptedException {
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
map1.replace("1", 10);
map1.replace("2", 20);
Thread.sleep(50);
assertThat(cache1).hasSize(2);
assertThat(cache2).isEmpty();
}
}.execute();
}
@Test
public void testReplaceValue() {
RMap<SimpleKey, SimpleValue> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
@ -556,17 +627,18 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
@Test
public void testFastRemoveAsync() throws InterruptedException, ExecutionException {
RMap<Integer, Integer> map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults());
Cache<CacheKey, CacheValue> cache = Deencapsulation.getField(map, "cache");
map.put(1, 3);
map.put(3, 5);
map.put(4, 6);
map.put(7, 8);
assertThat(map.fastRemoveAsync(1, 3, 7).get()).isEqualTo(3);
assertThat(cache.size()).isEqualTo(1);
assertThat(map.size()).isEqualTo(1);
public void testInvalidationOnFastRemove() throws InterruptedException {
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
map1.fastRemove("1", "2", "3");
Thread.sleep(50);
assertThat(cache1).isEmpty();
assertThat(cache2).isEmpty();
}
}.execute();
}
@Test
@ -606,6 +678,22 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(map.fastRemove("test")).isZero();
}
@Test
public void testInvalidationOnFastPut() throws InterruptedException {
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
map1.fastPut("1", 10);
map1.fastPut("2", 20);
Thread.sleep(50);
assertThat(cache1).hasSize(2);
assertThat(cache2).isEmpty();
}
}.execute();
}
@Test
public void testFastPut() {

Loading…
Cancel
Save