RLocalCachedMap doesn't work with non-json and non-binary codecs #976

pull/970/head
Nikita 8 years ago
parent 1b89afbd6f
commit 69d7aff15e

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.AbstractCollection;
@ -60,13 +61,17 @@ import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.ThreadLocalRandom;
@ -178,6 +183,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private int invalidationListenerId;
private int invalidationStatusListenerId;
private volatile long lastInvalidate;
private Codec topicCodec;
protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(commandExecutor, name, redisson, options);
@ -207,7 +213,28 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
private void addListeners(String name, final LocalCachedMapOptions<K, V> options, final RedissonClient redisson) {
invalidationTopic = new RedissonTopic<Object>(commandExecutor, suffixName(name, "topic"));
topicCodec = codec;
LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(new byte[] {1, 2, 3}, new byte[] {4, 5, 6});
ByteBuf buf = null;
try {
byte[] s = topicCodec.getValueEncoder().encode(msg);
buf = Unpooled.wrappedBuffer(s);
msg = (LocalCachedMapInvalidate) topicCodec.getValueDecoder().decode(buf, null);
if (!Arrays.equals(msg.getExcludedId(), new byte[] {1, 2, 3})
|| !Arrays.equals(msg.getKeyHashes()[0], new byte[] {4, 5, 6})) {
throw new IllegalArgumentException();
}
} catch (Exception e) {
log.warn("Defined {} codec doesn't encode service messages properly. Default JsonJacksonCodec used to encode messages!", topicCodec.getClass());
topicCodec = new JsonJacksonCodec();
} finally {
if (buf != null) {
buf.release();
}
}
invalidationTopic = new RedissonTopic<Object>(topicCodec, commandExecutor, suffixName(name, "topic"));
if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) {
return;
@ -1345,4 +1372,19 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future;
}
protected byte[] encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return topicCodec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
}

@ -8,7 +8,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.junit.Assert;
import org.junit.Test;
@ -22,6 +21,7 @@ import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.cache.Cache;
import org.redisson.client.codec.StringCodec;
import mockit.Deencapsulation;
@ -149,6 +149,32 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
}.execute();
}
@Test
public void testInvalidationOnUpdateNonBinaryCodec() throws InterruptedException {
LocalCachedMapOptions<String, String> options = LocalCachedMapOptions.<String, String>defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5);
RLocalCachedMap<String, String> map1 = redisson.getLocalCachedMap("test", new StringCodec(), options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
RLocalCachedMap<String, String> map2 = redisson.getLocalCachedMap("test", new StringCodec(), options);
Cache<CacheKey, CacheValue> cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", "1");
map1.put("2", "2");
assertThat(map2.get("1")).isEqualTo("1");
assertThat(map2.get("2")).isEqualTo("2");
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
map1.put("1", "3");
map2.put("2", "4");
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(1);
assertThat(cache2.size()).isEqualTo(1);
}
@Test
public void testInvalidationOnUpdate() throws InterruptedException {
new InvalidationTest() {

Loading…
Cancel
Save