From 69d7aff15ef89668952539bc107b2f13fa8590ce Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 20 Jul 2017 10:54:29 +0300 Subject: [PATCH] RLocalCachedMap doesn't work with non-json and non-binary codecs #976 --- .../org/redisson/RedissonLocalCachedMap.java | 44 ++++++++++++++++++- .../redisson/RedissonLocalCachedMapTest.java | 28 +++++++++++- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 2ca4e9a86..88f645330 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -15,6 +15,7 @@ */ package org.redisson; +import java.io.IOException; import java.io.Serializable; import java.math.BigDecimal; import java.util.AbstractCollection; @@ -60,13 +61,17 @@ import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.codec.JsonJacksonCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.misc.Hash; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.ThreadLocalRandom; @@ -178,6 +183,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private int invalidationListenerId; private int invalidationStatusListenerId; private volatile long lastInvalidate; + private Codec topicCodec; protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { super(commandExecutor, name, redisson, options); @@ -207,7 +213,28 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } private void addListeners(String name, final LocalCachedMapOptions options, final RedissonClient redisson) { - invalidationTopic = new RedissonTopic(commandExecutor, suffixName(name, "topic")); + topicCodec = codec; + + LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(new byte[] {1, 2, 3}, new byte[] {4, 5, 6}); + ByteBuf buf = null; + try { + byte[] s = topicCodec.getValueEncoder().encode(msg); + buf = Unpooled.wrappedBuffer(s); + msg = (LocalCachedMapInvalidate) topicCodec.getValueDecoder().decode(buf, null); + if (!Arrays.equals(msg.getExcludedId(), new byte[] {1, 2, 3}) + || !Arrays.equals(msg.getKeyHashes()[0], new byte[] {4, 5, 6})) { + throw new IllegalArgumentException(); + } + } catch (Exception e) { + log.warn("Defined {} codec doesn't encode service messages properly. Default JsonJacksonCodec used to encode messages!", topicCodec.getClass()); + topicCodec = new JsonJacksonCodec(); + } finally { + if (buf != null) { + buf.release(); + } + } + + invalidationTopic = new RedissonTopic(topicCodec, commandExecutor, suffixName(name, "topic")); if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) { return; @@ -1345,4 +1372,19 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return future; } + protected byte[] encode(Object value) { + if (commandExecutor.isRedissonReferenceSupportEnabled()) { + RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value); + if (reference != null) { + value = reference; + } + } + + try { + return topicCodec.getValueEncoder().encode(value); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + } diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index b1d57e481..d4f011772 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -8,7 +8,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; import org.junit.Assert; import org.junit.Test; @@ -22,6 +21,7 @@ import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy; import org.redisson.api.RLocalCachedMap; import org.redisson.api.RMap; import org.redisson.cache.Cache; +import org.redisson.client.codec.StringCodec; import mockit.Deencapsulation; @@ -148,6 +148,32 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { } }.execute(); } + + @Test + public void testInvalidationOnUpdateNonBinaryCodec() throws InterruptedException { + LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5); + RLocalCachedMap map1 = redisson.getLocalCachedMap("test", new StringCodec(), options); + Cache cache1 = Deencapsulation.getField(map1, "cache"); + + RLocalCachedMap map2 = redisson.getLocalCachedMap("test", new StringCodec(), options); + Cache cache2 = Deencapsulation.getField(map2, "cache"); + + map1.put("1", "1"); + map1.put("2", "2"); + + assertThat(map2.get("1")).isEqualTo("1"); + assertThat(map2.get("2")).isEqualTo("2"); + + assertThat(cache1.size()).isEqualTo(2); + assertThat(cache2.size()).isEqualTo(2); + + map1.put("1", "3"); + map2.put("2", "4"); + Thread.sleep(50); + + assertThat(cache1.size()).isEqualTo(1); + assertThat(cache2.size()).isEqualTo(1); + } @Test public void testInvalidationOnUpdate() throws InterruptedException {