ReconnectionStrategy and SyncStrategy were added to LocalCachedMapOptions. #1157

pull/1204/head
Nikita 7 years ago
parent 65ac6f33db
commit 84f5629143

@ -35,7 +35,8 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RScoredSortedSet;
@ -48,6 +49,7 @@ import org.redisson.cache.LFUCacheMap;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.LocalCachedMapClear;
import org.redisson.cache.LocalCachedMapInvalidate;
import org.redisson.cache.LocalCachedMapUpdate;
import org.redisson.cache.LocalCachedMessageCodec;
import org.redisson.cache.NoneCacheMap;
import org.redisson.cache.ReferenceCacheMap;
@ -66,11 +68,11 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.ThreadLocalRandom;
@ -182,6 +184,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private int invalidationListenerId;
private int invalidationStatusListenerId;
private volatile long lastInvalidate;
private SyncStrategy syncStrategy;
private final Codec topicCodec = new LocalCachedMessageCodec();
protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
@ -197,11 +200,12 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private void init(String name, LocalCachedMapOptions<K, V> options, RedissonClient redisson, EvictionScheduler evictionScheduler) {
instanceId = generateId();
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE
|| options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) {
syncStrategy = options.getSyncStrategy();
if (options.getSyncStrategy() != SyncStrategy.NONE) {
invalidateEntryOnChange = 1;
}
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT) {
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
invalidateEntryOnChange = 2;
evictionScheduler.schedule(getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1));
}
@ -214,18 +218,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private void addListeners(String name, final LocalCachedMapOptions<K, V> options, final RedissonClient redisson) {
invalidationTopic = new RedissonTopic<Object>(topicCodec, commandExecutor, suffixName(name, "topic"));
if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) {
return;
}
if (options.getInvalidationPolicy() != InvalidationPolicy.ON_CHANGE) {
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
invalidationStatusListenerId = invalidationTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) {
if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) {
cache.clear();
}
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD
// check if instance has already been used
&& lastInvalidate > 0) {
@ -271,26 +271,51 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
});
}
invalidationListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
cache.clear();
}
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
if (options.getSyncStrategy() != SyncStrategy.NONE) {
invalidationListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
cache.clear();
}
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
}
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT) {
if (msg instanceof LocalCachedMapUpdate) {
LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg;
for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) {
ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey());
ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue());
try {
CacheKey cacheKey = toCacheKey(keyBuf);
Object key = codec.getMapKeyDecoder().decode(keyBuf, null);
Object value = codec.getMapValueDecoder().decode(valueBuf, null);
cache.put(cacheKey, new CacheValue(key, value));
} catch (IOException e) {
log.error("Can't decode map entry", e);
} finally {
keyBuf.release();
valueBuf.release();
}
}
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
lastInvalidate = System.currentTimeMillis();
}
}
}
});
});
}
}
protected Cache<CacheKey, CacheValue> createCache(LocalCachedMapOptions<K, V> options) {
@ -401,14 +426,15 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
protected RFuture<V> putOperationAsync(K key, V value) {
ByteBuf mapKey = encodeMapKey(key);
ByteBuf mapValue = encodeMapKey(value);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
ByteBuf msg = createSyncMessage(mapKey, mapValue, cacheKey);
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
@ -416,30 +442,36 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "end; "
+ "return v; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
mapKey, encodeMapValue(value), msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
}
protected ByteBuf createSyncMessage(ByteBuf mapKey, ByteBuf mapValue, CacheKey cacheKey) {
if (syncStrategy == SyncStrategy.UPDATE) {
return encode(new LocalCachedMapUpdate(mapKey, mapValue));
}
return encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
}
@Override
protected RFuture<Boolean> fastPutOperationAsync(K key, V value) {
ByteBuf encodedKey = encodeMapKey(key);
ByteBuf encodedValue = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(encodedKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey);
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
"if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "return 0; "
+ "end; "
+ "return 1; ",
@ -957,15 +989,33 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
params.add(map.size()*2);
byte[][] hashes = new byte[map.size()][];
int i = 0;
int payloadSize = 0;
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
ByteBuf mapKey = encodeMapKey(t.getKey());
payloadSize += mapKey.readableBytes();
ByteBuf mapValue = encodeMapValue(t.getValue());
payloadSize += mapValue.readableBytes();
params.add(mapKey);
params.add(mapValue);
CacheKey cacheKey = toCacheKey(mapKey);
hashes[i] = cacheKey.getKeyHash();
i++;
}
ByteBuf msgEncoded;
if (syncStrategy == SyncStrategy.UPDATE) {
List<LocalCachedMapUpdate.Entry> entries = new ArrayList<LocalCachedMapUpdate.Entry>();
for (int j = 2; j < params.size(); j += 2) {
ByteBuf key = (ByteBuf) params.get(j);
ByteBuf value = (ByteBuf) params.get(j+1);
entries.add(new LocalCachedMapUpdate.Entry(key, value));
}
msgEncoded = encode(new LocalCachedMapUpdate(entries));
} else {
msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, hashes));
}
if (invalidateEntryOnChange == 2) {
long time = System.currentTimeMillis();
@ -973,11 +1023,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] entryId = generateLogEntryId(hash);
params.add(time);
params.add(entryId);
payloadSize += entryId.length + 8;
}
}
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, hashes));
params.add(msgEncoded);
payloadSize += msgEncoded.readableBytes();
log.debug("Payload size passed to putAll method: {}", payloadSize);
final RPromise<Void> result = newPromise();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
@ -1212,8 +1265,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
ByteBuf msg = createSyncMessage(keyState, valueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
@ -1262,8 +1314,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf newValueState = encodeMapValue(newValue);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
ByteBuf msg = createSyncMessage(keyState, newValueState, cacheKey);
return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "

@ -30,35 +30,65 @@ import org.redisson.api.map.MapWriter;
*/
public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
public enum InvalidationPolicy {
/**
* Various strategies to avoid stale objects in cache.
* Handle cases when map instance has been disconnected for a while.
*
*/
public enum ReconnectionStrategy {
/**
* No invalidation on map changes
*/
NONE,
/**
* Invalidate cache entry across all LocalCachedMap instances on map entry change.
* No reconnect handling.
*/
ON_CHANGE,
NONE,
/**
* Invalidate cache entry across all LocalCachedMap instances on map entry change.
* <p>
* Clear cache if LocalCachedMap instance has been disconnected for a while.
* It's applied to avoid stale objects in cache.
* Clear local cache if map instance has been disconnected for a while.
*/
ON_CHANGE_WITH_CLEAR_ON_RECONNECT,
CLEAR,
/**
* Invalidate cache entry across all LocalCachedMap instances on map entry change.
* <p>
* Store invalidated entry hash in invalidation log for 10 minutes.
* Cache keys for stored invalidated entry hashes will be removed
* if LocalCachedMap instance has been disconnected less than 10 minutes
* or whole cache will be cleaned otherwise.
* It's applied to avoid stale objects in cache.
*/
LOAD
}
public enum SyncStrategy {
/**
* No synchronizations on map changes.
*/
NONE,
/**
* Invalidate cache entry across all LocalCachedMap instances on map entry change.
*/
INVALIDATE,
/**
* Update cache entry across all LocalCachedMap instances on map entry change.
*/
UPDATE
}
/**
* Use {@link #syncStrategy(SyncStrategy)} and/or {@link #reconnectionStrategy(ReconnectionStrategy)} instead
*
*/
@Deprecated
public enum InvalidationPolicy {
NONE,
ON_CHANGE,
ON_CHANGE_WITH_CLEAR_ON_RECONNECT,
ON_CHANGE_WITH_LOAD_ON_RECONNECT
}
@ -92,7 +122,8 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
WEAK
};
private InvalidationPolicy invalidationPolicy;
private ReconnectionStrategy reconnectionStrategy;
private SyncStrategy syncStrategy;
private EvictionPolicy evictionPolicy;
private int cacheSize;
private long timeToLiveInMillis;
@ -102,7 +133,8 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
}
protected LocalCachedMapOptions(LocalCachedMapOptions<K, V> copy) {
this.invalidationPolicy = copy.invalidationPolicy;
this.reconnectionStrategy = copy.reconnectionStrategy;
this.syncStrategy = copy.syncStrategy;
this.evictionPolicy = copy.evictionPolicy;
this.cacheSize = copy.cacheSize;
this.timeToLiveInMillis = copy.timeToLiveInMillis;
@ -130,7 +162,8 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
return new LocalCachedMapOptions<K, V>()
.cacheSize(0).timeToLive(0).maxIdle(0)
.evictionPolicy(EvictionPolicy.NONE)
.invalidationPolicy(InvalidationPolicy.ON_CHANGE);
.reconnectionStrategy(ReconnectionStrategy.NONE)
.syncStrategy(SyncStrategy.INVALIDATE);
}
public EvictionPolicy getEvictionPolicy() {
@ -160,34 +193,55 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
return this;
}
public InvalidationPolicy getInvalidationPolicy() {
return invalidationPolicy;
public ReconnectionStrategy getReconnectionStrategy() {
return reconnectionStrategy;
}
public SyncStrategy getSyncStrategy() {
return syncStrategy;
}
public LocalCachedMapOptions<K, V> reconnectionStrategy(ReconnectionStrategy reconnectionStrategy) {
if (reconnectionStrategy == null) {
throw new NullPointerException("reconnectionStrategy can't be null");
}
this.reconnectionStrategy = reconnectionStrategy;
return this;
}
public LocalCachedMapOptions<K, V> syncStrategy(SyncStrategy syncStrategy) {
if (syncStrategy == null) {
throw new NullPointerException("syncStrategy can't be null");
}
this.syncStrategy = syncStrategy;
return this;
}
/**
* Sets entry invalidation policy.
*
* @param invalidationPolicy
* <p><code>NONE</code> - no invalidation applied.
* <p><code>ON_CHANGE</code> - invalidation message which removes corresponding entry from cache
* will be sent to all other RLocalCachedMap instances on each entry update/remove operation.
* <p><code>ON_CHANGE_WITH_CLEAR_ON_RECONNECT</code> - includes <code>ON_CHANGE</code> policy
* and clears local cache of current instance in case of reconnection to Redis.
* Use {@link #syncStrategy(SyncStrategy)} and/or {@link #reconnectionStrategy(ReconnectionStrategy)} instead
*
* @return LocalCachedMapOptions instance
*/
@Deprecated
public LocalCachedMapOptions<K, V> invalidationPolicy(InvalidationPolicy invalidationPolicy) {
this.invalidationPolicy = invalidationPolicy;
if (invalidationPolicy == InvalidationPolicy.ON_CHANGE) {
this.syncStrategy = SyncStrategy.INVALIDATE;
}
if (invalidationPolicy == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) {
this.syncStrategy = SyncStrategy.INVALIDATE;
this.reconnectionStrategy = ReconnectionStrategy.CLEAR;
}
if (invalidationPolicy == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT) {
this.syncStrategy = SyncStrategy.INVALIDATE;
this.reconnectionStrategy = ReconnectionStrategy.LOAD;
}
return this;
}
/**
* Sets entry invalidation behavior.
* Use {@link #syncStrategy(SyncStrategy)} and/or {@link #reconnectionStrategy(ReconnectionStrategy)} instead
*
* @param value - if <code>true</code> then invalidation message which removes corresponding entry from cache
* will be sent to all other RLocalCachedMap instances on each entry update/remove operation.
* if <code>false</code> then invalidation message won't be sent
* @return LocalCachedMapOptions instance
*/
@Deprecated
public LocalCachedMapOptions<K, V> invalidateEntryOnChange(boolean value) {

@ -0,0 +1,89 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
*/
@SuppressWarnings("serial")
public class LocalCachedMapUpdate implements Serializable {
public static class Entry {
private final byte[] key;
private final byte[] value;
public Entry(byte[] key, byte[] value) {
this.key = key;
this.value = value;
}
public Entry(ByteBuf keyBuf, ByteBuf valueBuf) {
key = new byte[keyBuf.readableBytes()];
keyBuf.getBytes(keyBuf.readerIndex(), key);
value = new byte[valueBuf.readableBytes()];
valueBuf.getBytes(valueBuf.readerIndex(), value);
}
public byte[] getKey() {
return key;
}
public byte[] getValue() {
return value;
}
}
private List<Entry> entries = new ArrayList<Entry>();
public LocalCachedMapUpdate() {
}
public LocalCachedMapUpdate(List<Entry> entries) {
super();
this.entries = entries;
}
public LocalCachedMapUpdate(ByteBuf keyBuf, ByteBuf valueBuf) {
byte[] key = new byte[keyBuf.readableBytes()];
keyBuf.getBytes(keyBuf.readerIndex(), key);
byte[] value = new byte[valueBuf.readableBytes()];
valueBuf.getBytes(valueBuf.readerIndex(), value);
entries = Collections.singletonList(new Entry(key, value));
}
public LocalCachedMapUpdate(byte[] key, byte[] value) {
entries = Collections.singletonList(new Entry(key, value));
}
public Collection<Entry> getEntries() {
return entries;
}
}

@ -16,6 +16,8 @@
package org.redisson.cache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
@ -52,6 +54,24 @@ public class LocalCachedMessageCodec implements Codec {
}
return new LocalCachedMapInvalidate(excludedId, hashes);
}
if (type == 0x2) {
List<LocalCachedMapUpdate.Entry> entries = new ArrayList<LocalCachedMapUpdate.Entry>();
while (true) {
int keyLen = buf.readInt();
byte[] key = new byte[keyLen];
buf.readBytes(key);
int valueLen = buf.readInt();
byte[] value = new byte[valueLen];
buf.readBytes(value);
entries.add(new LocalCachedMapUpdate.Entry(key, value));
if (!buf.isReadable()) {
break;
}
}
return new LocalCachedMapUpdate(entries);
}
throw new IllegalArgumentException("Can't parse packet");
}
@ -77,6 +97,20 @@ public class LocalCachedMessageCodec implements Codec {
}
return result;
}
if (in instanceof LocalCachedMapUpdate) {
LocalCachedMapUpdate li = (LocalCachedMapUpdate) in;
ByteBuf result = ByteBufAllocator.DEFAULT.buffer(256);
result.writeByte(0x2);
for (LocalCachedMapUpdate.Entry e : li.getEntries()) {
result.writeInt(e.getKey().length);
result.writeBytes(e.getKey());
result.writeInt(e.getValue().length);
result.writeBytes(e.getValue());
}
return result;
}
throw new IllegalArgumentException("Can't encode packet " + in);
}

@ -17,7 +17,8 @@ import org.redisson.RedissonMapTest.SimpleKey;
import org.redisson.RedissonMapTest.SimpleValue;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.cache.Cache;
@ -27,6 +28,39 @@ import mockit.Deencapsulation;
public class RedissonLocalCachedMapTest extends BaseMapTest {
public abstract class UpdateTest {
RLocalCachedMap<String, Integer> map1;
RLocalCachedMap<String, Integer> map2;
Cache<CacheKey, CacheValue> cache1;
Cache<CacheKey, CacheValue> cache2;
public void execute() throws InterruptedException {
LocalCachedMapOptions<String, Integer> options = LocalCachedMapOptions.<String, Integer>defaults().evictionPolicy(EvictionPolicy.LFU)
.syncStrategy(SyncStrategy.UPDATE)
.reconnectionStrategy(ReconnectionStrategy.CLEAR)
.cacheSize(5);
map1 = redisson.getLocalCachedMap("test2", options);
cache1 = Deencapsulation.getField(map1, "cache");
map2 = redisson.getLocalCachedMap("test2", options);
cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", 1);
map1.put("2", 2);
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
test();
}
public abstract void test() throws InterruptedException;
}
public abstract class InvalidationTest {
RLocalCachedMap<String, Integer> map1;
@ -176,7 +210,7 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
}
@Test
public void testInvalidationOnUpdate() throws InterruptedException {
public void testSyncOnUpdate() throws InterruptedException {
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
@ -188,6 +222,18 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(cache2.size()).isEqualTo(1);
}
}.execute();
new UpdateTest() {
@Override
public void test() throws InterruptedException {
map1.put("1", 3);
map2.put("2", 4);
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
}
}.execute();
}
@Test
@ -195,7 +241,7 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
LocalCachedMapOptions<String, Integer> options = LocalCachedMapOptions.<String, Integer>defaults()
.evictionPolicy(EvictionPolicy.LFU)
.cacheSize(5)
.invalidationPolicy(InvalidationPolicy.NONE);
.syncStrategy(SyncStrategy.NONE);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
@ -225,7 +271,7 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
LocalCachedMapOptions<String, Integer> options = LocalCachedMapOptions.<String, Integer>defaults()
.evictionPolicy(EvictionPolicy.LFU)
.cacheSize(5)
.invalidationPolicy(InvalidationPolicy.NONE);
.syncStrategy(SyncStrategy.NONE);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
@ -251,7 +297,7 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
}
@Test
public void testInvalidationOnRemove() throws InterruptedException {
public void testSyncOnRemove() throws InterruptedException {
new InvalidationTest() {
@Override
public void test() throws InterruptedException {
@ -263,6 +309,18 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(cache2.size()).isEqualTo(0);
}
}.execute();
new UpdateTest() {
@Override
public void test() throws InterruptedException {
map1.remove("1");
map2.remove("2");
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(0);
assertThat(cache2.size()).isEqualTo(0);
}
}.execute();
}
@Test
@ -699,7 +757,7 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
.evictionPolicy(EvictionPolicy.NONE)
.cacheSize(3)
.invalidationPolicy(InvalidationPolicy.NONE);
.syncStrategy(SyncStrategy.NONE);
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);
assertThat(map.fastRemove("test")).isZero();
}

Loading…
Cancel
Save