From aef7c49482287b86d9504e34e96b0798eb2f4c88 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 27 Oct 2017 17:03:23 +0300 Subject: [PATCH] Added binary codec for messages used in LocalCachedMap object #1117 --- .../org/redisson/RedissonLocalCachedMap.java | 32 +---- .../cache/LocalCachedMessageCodec.java | 119 ++++++++++++++++++ 2 files changed, 122 insertions(+), 29 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 61f3bf0ba..40c6c15a8 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -48,6 +48,7 @@ import org.redisson.cache.LFUCacheMap; import org.redisson.cache.LRUCacheMap; import org.redisson.cache.LocalCachedMapClear; import org.redisson.cache.LocalCachedMapInvalidate; +import org.redisson.cache.LocalCachedMessageCodec; import org.redisson.cache.NoneCacheMap; import org.redisson.cache.ReferenceCacheMap; import org.redisson.client.codec.ByteArrayCodec; @@ -61,7 +62,6 @@ import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; -import org.redisson.codec.JsonJacksonCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.misc.Hash; @@ -182,7 +182,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private int invalidationListenerId; private int invalidationStatusListenerId; private volatile long lastInvalidate; - private Codec topicCodec; + private final Codec topicCodec = new LocalCachedMessageCodec(); protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { super(commandExecutor, name, redisson, options); @@ -212,26 +212,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } private void addListeners(String name, final LocalCachedMapOptions options, final RedissonClient redisson) { - topicCodec = codec; - - LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(new byte[] {1, 2, 3}, new byte[] {4, 5, 6}); - ByteBuf buf = null; - try { - buf = topicCodec.getValueEncoder().encode(msg); - msg = (LocalCachedMapInvalidate) topicCodec.getValueDecoder().decode(buf, null); - if (!Arrays.equals(msg.getExcludedId(), new byte[] {1, 2, 3}) - || !Arrays.equals(msg.getKeyHashes()[0], new byte[] {4, 5, 6})) { - throw new IllegalArgumentException(); - } - } catch (Exception e) { - log.warn("Defined {} codec doesn't encode service messages properly. Default JsonJacksonCodec used to encode messages!", topicCodec.getClass()); - topicCodec = new JsonJacksonCodec(); - } finally { - if (buf != null) { - buf.release(); - } - } - invalidationTopic = new RedissonTopic(topicCodec, commandExecutor, suffixName(name, "topic")); if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) { @@ -1390,14 +1370,8 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return future; } + @Override protected ByteBuf encode(Object value) { - if (commandExecutor.isRedissonReferenceSupportEnabled()) { - RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value); - if (reference != null) { - value = reference; - } - } - try { return topicCodec.getValueEncoder().encode(value); } catch (IOException e) { diff --git a/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java b/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java new file mode 100644 index 000000000..5e759a86b --- /dev/null +++ b/redisson/src/main/java/org/redisson/cache/LocalCachedMessageCodec.java @@ -0,0 +1,119 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.cache; + +import java.io.IOException; + +import org.redisson.client.codec.Codec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; + +/** + * + * @author Nikita Koksharov + * + */ +public class LocalCachedMessageCodec implements Codec { + + private final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + byte type = buf.readByte(); + if (type == 0x0) { + return new LocalCachedMapClear(); + } + + if (type == 0x1) { + byte[] excludedId = new byte[16]; + buf.readBytes(excludedId); + int hashesCount = buf.readInt(); + byte[][] hashes = new byte[hashesCount][]; + for (int i = 0; i < hashesCount; i++) { + byte[] keyHash = new byte[16]; + buf.readBytes(keyHash); + hashes[i] = keyHash; + } + return new LocalCachedMapInvalidate(excludedId, hashes); + } + + throw new IllegalArgumentException("Can't parse packet"); + } + }; + + private final Encoder encoder = new Encoder() { + + @Override + public ByteBuf encode(Object in) throws IOException { + if (in instanceof LocalCachedMapClear) { + ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1); + result.writeByte(0x0); + return result; + } + if (in instanceof LocalCachedMapInvalidate) { + LocalCachedMapInvalidate li = (LocalCachedMapInvalidate) in; + ByteBuf result = ByteBufAllocator.DEFAULT.buffer(); + result.writeByte(0x1); + result.writeBytes(li.getExcludedId()); + result.writeInt(li.getKeyHashes().length); + for (int i = 0; i < li.getKeyHashes().length; i++) { + result.writeBytes(li.getKeyHashes()[i]); + } + return result; + } + + throw new IllegalArgumentException("Can't encode packet " + in); + } + }; + + + public LocalCachedMessageCodec() { + } + + @Override + public Decoder getMapValueDecoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Encoder getMapValueEncoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Decoder getMapKeyDecoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Encoder getMapKeyEncoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Decoder getValueDecoder() { + return decoder; + } + + @Override + public Encoder getValueEncoder() { + return encoder; + } + +}