Added binary codec for messages used in LocalCachedMap object #1117

pull/1124/head
Nikita 7 years ago
parent 146d87c7e5
commit aef7c49482

@ -48,6 +48,7 @@ import org.redisson.cache.LFUCacheMap;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.LocalCachedMapClear;
import org.redisson.cache.LocalCachedMapInvalidate;
import org.redisson.cache.LocalCachedMessageCodec;
import org.redisson.cache.NoneCacheMap;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.codec.ByteArrayCodec;
@ -61,7 +62,6 @@ import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.Hash;
@ -182,7 +182,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private int invalidationListenerId;
private int invalidationStatusListenerId;
private volatile long lastInvalidate;
private Codec topicCodec;
private final Codec topicCodec = new LocalCachedMessageCodec();
protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(commandExecutor, name, redisson, options);
@ -212,26 +212,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
private void addListeners(String name, final LocalCachedMapOptions<K, V> options, final RedissonClient redisson) {
topicCodec = codec;
LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(new byte[] {1, 2, 3}, new byte[] {4, 5, 6});
ByteBuf buf = null;
try {
buf = topicCodec.getValueEncoder().encode(msg);
msg = (LocalCachedMapInvalidate) topicCodec.getValueDecoder().decode(buf, null);
if (!Arrays.equals(msg.getExcludedId(), new byte[] {1, 2, 3})
|| !Arrays.equals(msg.getKeyHashes()[0], new byte[] {4, 5, 6})) {
throw new IllegalArgumentException();
}
} catch (Exception e) {
log.warn("Defined {} codec doesn't encode service messages properly. Default JsonJacksonCodec used to encode messages!", topicCodec.getClass());
topicCodec = new JsonJacksonCodec();
} finally {
if (buf != null) {
buf.release();
}
}
invalidationTopic = new RedissonTopic<Object>(topicCodec, commandExecutor, suffixName(name, "topic"));
if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) {
@ -1390,14 +1370,8 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future;
}
@Override
protected ByteBuf encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return topicCodec.getValueEncoder().encode(value);
} catch (IOException e) {

@ -0,0 +1,119 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
import java.io.IOException;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
/**
*
* @author Nikita Koksharov
*
*/
public class LocalCachedMessageCodec implements Codec {
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
byte type = buf.readByte();
if (type == 0x0) {
return new LocalCachedMapClear();
}
if (type == 0x1) {
byte[] excludedId = new byte[16];
buf.readBytes(excludedId);
int hashesCount = buf.readInt();
byte[][] hashes = new byte[hashesCount][];
for (int i = 0; i < hashesCount; i++) {
byte[] keyHash = new byte[16];
buf.readBytes(keyHash);
hashes[i] = keyHash;
}
return new LocalCachedMapInvalidate(excludedId, hashes);
}
throw new IllegalArgumentException("Can't parse packet");
}
};
private final Encoder encoder = new Encoder() {
@Override
public ByteBuf encode(Object in) throws IOException {
if (in instanceof LocalCachedMapClear) {
ByteBuf result = ByteBufAllocator.DEFAULT.buffer(1);
result.writeByte(0x0);
return result;
}
if (in instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate li = (LocalCachedMapInvalidate) in;
ByteBuf result = ByteBufAllocator.DEFAULT.buffer();
result.writeByte(0x1);
result.writeBytes(li.getExcludedId());
result.writeInt(li.getKeyHashes().length);
for (int i = 0; i < li.getKeyHashes().length; i++) {
result.writeBytes(li.getKeyHashes()[i]);
}
return result;
}
throw new IllegalArgumentException("Can't encode packet " + in);
}
};
public LocalCachedMessageCodec() {
}
@Override
public Decoder<Object> getMapValueDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapKeyEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
return encoder;
}
}
Loading…
Cancel
Save