Merge pull request #5262 from dumbbell-5kg/master

Protobuf support
pull/5004/head^2
Nikita Koksharov 1 year ago committed by GitHub
commit ed33c41610
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -397,6 +397,33 @@
<version>3.5.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.8.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.8.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.16.3</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.16.3</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
</dependencies>
<build>

@ -0,0 +1,223 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.codec;
import com.fasterxml.jackson.databind.cfg.SerializerFactoryConfig;
import com.fasterxml.jackson.databind.ser.BasicSerializerFactory;
import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
public class ProtobufCodec extends BaseCodec {
private final Class<?> mapKeyClass;
private final Class<?> mapValueClass;
private final Class<?> valueClass;
//classes in blacklist will not be serialized using protobuf ,but instead will use blacklistCodec
private final Set<String> protobufBlacklist;
//default value is JsonJacksonCodec
private final Codec blacklistCodec;
public ProtobufCodec(Class<?> mapKeyClass, Class<?> mapValueClass) {
this(mapKeyClass, mapValueClass, null, null);
}
/**
* @param blacklistCodec classes in protobufBlacklist will use this codec
*/
public ProtobufCodec(Class<?> mapKeyClass, Class<?> mapValueClass, Codec blacklistCodec) {
this(mapKeyClass, mapValueClass, null, blacklistCodec);
}
public ProtobufCodec(Class<?> valueClass) {
this(null, null, valueClass, null);
}
/**
* @param blacklistCodec classes in protobufBlacklist will use this codec
*/
public ProtobufCodec(Class<?> valueClass, Codec blacklistCodec) {
this(null, null, valueClass, blacklistCodec);
}
private ProtobufCodec(Class<?> mapKeyClass, Class<?> mapValueClass, Class<?> valueClass, Codec blacklistCodec) {
this.mapKeyClass = mapKeyClass;
this.mapValueClass = mapValueClass;
this.valueClass = valueClass;
if (blacklistCodec == null) {
this.blacklistCodec = new JsonJacksonCodec();
} else {
if (blacklistCodec instanceof ProtobufCodec) {
//will loop infinitely when encode or decode
throw new IllegalArgumentException("BlacklistCodec can not be ProtobufCodec");
}
this.blacklistCodec = blacklistCodec;
}
protobufBlacklist = new HashSet<>();
protobufBlacklist.addAll(BasicSerializerFactoryConcreteGetter.getConcreteKeySet());
protobufBlacklist.add(ArrayList.class.getName());
protobufBlacklist.add(HashSet.class.getName());
protobufBlacklist.add(HashMap.class.getName());
}
public void addBlacklist(Class<?> clazz) {
protobufBlacklist.add(clazz.getName());
}
public void removeBlacklist(Class<?> clazz) {
protobufBlacklist.remove(clazz.getName());
}
@Override
public Decoder<Object> getValueDecoder() {
return createDecoder(valueClass, blacklistCodec.getValueDecoder());
}
@Override
public Encoder getValueEncoder() {
return createEncoder(valueClass, blacklistCodec.getValueEncoder());
}
@Override
public Decoder<Object> getMapValueDecoder() {
return createDecoder(mapValueClass, blacklistCodec.getMapValueDecoder());
}
@Override
public Encoder getMapValueEncoder() {
return createEncoder(mapValueClass, blacklistCodec.getMapValueEncoder());
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return createDecoder(mapKeyClass, blacklistCodec.getMapKeyDecoder());
}
@Override
public Encoder getMapKeyEncoder() {
return createEncoder(mapKeyClass, blacklistCodec.getMapKeyEncoder());
}
private Decoder<Object> createDecoder(Class<?> clazz, Decoder<Object> blacklistDecoder) {
if (clazz == null) {
throw new IllegalArgumentException("class to create protobuf decoder can not be null");
}
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
//use blacklistDecoder
if (protobufBlacklist.contains(clazz.getName())) {
return blacklistDecoder.decode(buf, state);
}
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
if (MessageLite.class.isAssignableFrom(clazz)) {
//native deserialize
try {
return clazz.getDeclaredMethod("parseFrom", byte[].class).invoke(clazz, bytes);
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
throw new RuntimeException(e);
}
} else {
//protostuff
return ProtostuffUtils.deserialize(bytes, clazz);
}
}
};
}
private Encoder createEncoder(Class<?> clazz, Encoder blacklistEncoder) {
if (clazz == null) {
throw new IllegalArgumentException("class to create protobuf encoder can not be null");
}
return new Encoder() {
@Override
public ByteBuf encode(Object in) throws IOException {
//use blacklistEncoder
if (protobufBlacklist.contains(clazz.getName())) {
return blacklistEncoder.encode(in);
}
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
if (MessageLite.class.isAssignableFrom(clazz)) {
//native serialize
out.writeBytes(((MessageLite) in).toByteArray());
} else {
//protostuff
out.writeBytes(ProtostuffUtils.serialize(in));
}
return out;
}
};
}
private static class ProtostuffUtils {
@SuppressWarnings("unchecked")
public static <T> byte[] serialize(T obj) {
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
return ProtostuffIOUtil.toByteArray(obj, RuntimeSchema.getSchema((Class<T>) obj.getClass()), buffer);
} finally {
buffer.clear();
}
}
public static <T> T deserialize(byte[] data, Class<T> clazz) {
Schema<T> schema = RuntimeSchema.getSchema(clazz);
T obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
}
}
private abstract static class BasicSerializerFactoryConcreteGetter extends BasicSerializerFactory {
protected BasicSerializerFactoryConcreteGetter(SerializerFactoryConfig config) {
super(config);
}
private static Set<String> getConcreteKeySet() {
Set<String> concreteKeySet = new HashSet<>();
if (_concrete != null && !_concrete.isEmpty()) {
concreteKeySet.addAll(_concrete.keySet());
}
if (_concreteLazy != null && !_concreteLazy.isEmpty()) {
concreteKeySet.addAll(_concreteLazy.keySet());
}
return concreteKeySet;
}
}
}

@ -3,15 +3,20 @@ package org.redisson;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import net.bytebuddy.utility.RandomString;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.RBucket;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.codec.*;
import org.redisson.codec.protobuf.nativeData.Proto2AllTypes;
import org.redisson.codec.protobuf.nativeData.Proto3AllTypes;
import org.redisson.codec.protobuf.protostuffData.StuffData;
import org.redisson.config.Config;
import java.io.IOException;
@ -28,10 +33,14 @@ public class RedissonCodecTest extends BaseTest {
private Codec fstCodec = new FstCodec();
private Codec snappyCodec = new SnappyCodec();
private Codec snappyCodecV2 = new SnappyCodecV2();
// private Codec msgPackCodec = new MsgPackJacksonCodec();
// private Codec msgPackCodec = new MsgPackJacksonCodec();
private Codec lz4Codec = new LZ4Codec();
private Codec jsonListOfStringCodec = new TypedJsonJacksonCodec(
new TypeReference<String>() {}, new TypeReference<List<String>>() {});
private Codec protobufV2Codec = new ProtobufCodec(String.class, Proto2AllTypes.AllTypes2.class);
private Codec protobufV3Codec = new ProtobufCodec(String.class, Proto3AllTypes.AllTypes3.class,new JsonJacksonCodec());
private Codec protobufStuffDataCodec = new ProtobufCodec( StuffData.class);
@Test
public void testLZ4() {
@ -140,6 +149,89 @@ public class RedissonCodecTest extends BaseTest {
test(redisson);
}
@Test
public void testProtobufV2() {
//native V2
Config config = createConfig();
config.setCodec(protobufV2Codec);
RedissonClient redisson = Redisson.create(config);
final Proto2AllTypes.AllTypes2 allTypes2 = Proto2AllTypes.AllTypes2.newBuilder()
.addDoubleType(1)
.addDoubleType(1.1)
.setFloatType(1.1f)
.setInt32Type(1)
.setInt64Type(1)
.setUint32Type(1)
.setUint64Type(1)
.setSint32Type(1)
.setSint64Type(1)
.setFixed32Type(1)
.setFixed64Type(1)
.setSfixed32Type(1)
.setSfixed64Type(1)
.setBoolType(true)
.setStringType("1")
.setBytesType(ByteString.copyFrom("1".getBytes()))
.build();
final RMap<String, Proto2AllTypes.AllTypes2> v2rMap = redisson.getMap("protobuf2Map");
v2rMap.put("V2",allTypes2);
final Proto2AllTypes.AllTypes2 getAllTypes2 = v2rMap.get("V2");
Assertions.assertEquals(allTypes2, getAllTypes2);
redisson.shutdown();
}
@Test
public void testProtobufV3() {
//native V3
Config config = createConfig();
config.setCodec(protobufV3Codec);
RedissonClient redisson = Redisson.create(config);
final Proto3AllTypes.AllTypes3 allTypes3 = Proto3AllTypes.AllTypes3.newBuilder()
.addDoubleType(1.1)
.addDoubleType(1.2)
.setFloatType(1.1f)
.setInt32Type(1)
.setInt64Type(1)
.setUint32Type(1)
.setUint64Type(1)
.setSint32Type(1)
.setSint64Type(1)
.setFixed32Type(1)
.setFixed64Type(1)
.setSfixed32Type(1)
.setSfixed64Type(1)
.setBoolType(true)
.setStringType("1")
.setBytesType(ByteString.copyFrom("1".getBytes()))
.build();
final RMap<String, Proto3AllTypes.AllTypes3> v3rMap = redisson.getMap("protobuf3Map");
v3rMap.put("V3",allTypes3);
final Proto3AllTypes.AllTypes3 getAllTypes3 = v3rMap.get("V3");
Assertions.assertEquals(allTypes3, getAllTypes3);
redisson.shutdown();
}
@Test
public void testProtostuff() {
//protostuff (a framework that bypasses the need to compile .proto files into .java file.)
Config config = createConfig();
config.setCodec(protobufStuffDataCodec);
RedissonClient redisson = Redisson.create(config);
final StuffData stuffData = new StuffData();
stuffData.setAge(18);
List<String> hobbies = new ArrayList<>();
hobbies.add("game");
hobbies.add("game");
stuffData.setHobbies(hobbies);
stuffData.setName("ccc");
RList<StuffData> protostuffList = redisson.getList("protostuffList");
protostuffList.add(stuffData);
final StuffData getStuffData = protostuffList.get(0);
Assertions.assertEquals(stuffData, getStuffData);
redisson.shutdown();
}
@Test
public void testKryo() {
Config config = createConfig();

@ -0,0 +1,103 @@
package org.redisson.codec.protobuf;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.client.protocol.Encoder;
import org.redisson.codec.Kryo5Codec;
import org.redisson.codec.ProtobufCodec;
import org.redisson.codec.protobuf.protostuffData.StuffData;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
public class ProtobufCodecTest {
@Test
public void testBlacklist() throws IOException {
Class<StuffData> stuffDataClass = StuffData.class;
ProtobufCodec protobufCodec = new ProtobufCodec(stuffDataClass);
Encoder valueEncoder = protobufCodec.getValueEncoder();
//classes in blacklist will not be serialized using protobuf ,but instead will use blacklistCodec
protobufCodec.addBlacklist(stuffDataClass);
final StuffData stuffData = getStuffData();
ByteBuf buf = valueEncoder.encode(stuffData);
byte[] jsonBytes = new byte[buf.readableBytes()];
buf.readBytes(jsonBytes);
Assertions.assertTrue(isValidJson(new String(jsonBytes)));
//classes not in blacklist will be serialized using protobuf
protobufCodec.removeBlacklist(stuffDataClass);
buf = valueEncoder.encode(stuffData);
byte[] protobufBytes = new byte[buf.readableBytes()];
buf.readBytes(protobufBytes);
StuffData desStuffData = deserializeProtobufBytes(protobufBytes, StuffData.class);
Assertions.assertEquals(stuffData, desStuffData);
}
@Test
public void testBlacklistCodec() throws IOException {
Class<StuffData> stuffDataClass = StuffData.class;
//default blacklistCodec is JsonJacksonCodec
ProtobufCodec protobufCodec = new ProtobufCodec(stuffDataClass);
protobufCodec.addBlacklist(stuffDataClass);
ByteBuf buf = protobufCodec.getValueEncoder().encode(getStuffData());
byte[] jsonBytes = new byte[buf.readableBytes()];
buf.readBytes(jsonBytes);
Assertions.assertTrue(isValidJson(new String(jsonBytes)));
//replace default blacklistCodec with Kryo5Codec
Kryo5Codec kryo5Codec = new Kryo5Codec();
protobufCodec = new ProtobufCodec(String.class, kryo5Codec);
LinkedHashSet<String> v11 = new LinkedHashSet<>();
v11.add("123");
ByteBuf v1 = protobufCodec.getValueEncoder().encode(v11);
LinkedHashSet<String> v11_1 = (LinkedHashSet<String>) kryo5Codec.getValueDecoder().decode(v1, null);
Assertions.assertTrue(v11_1.size() == 1 && v11_1.contains("123"));
//illegal blacklistCodec
Assertions.assertThrows(IllegalArgumentException.class
, () -> new ProtobufCodec(stuffDataClass, new ProtobufCodec(stuffDataClass))
, "BlacklistCodec can not be ProtobufCodec");
}
private <T> T deserializeProtobufBytes(byte[] data, Class<T> clazz) {
Schema<T> schema = RuntimeSchema.getSchema(clazz);
T obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
}
@NotNull
private StuffData getStuffData() {
final StuffData stuffData = new StuffData();
stuffData.setAge(18);
List<String> hobbies = new ArrayList<>();
hobbies.add("game");
hobbies.add("game");
stuffData.setHobbies(hobbies);
stuffData.setName("ccc");
return stuffData;
}
private boolean isValidJson(String jsonString) {
ObjectMapper objectMapper = new ObjectMapper();
try {
objectMapper.readTree(jsonString);
return true;
} catch (JsonProcessingException e) {
return false;
}
}
}

@ -0,0 +1,27 @@
syntax = "proto2";
package org.redisson.codec.protobuf.raw;
option java_package = "org.redisson.codec.protobuf.raw";
option java_outer_classname = "Proto2AllTypes";
message AllTypes2{
//types from https://protobuf.dev/programming-guides/proto2/
repeated double doubleType = 1;
optional float floatType = 2;
required int32 int32Type = 3;
optional int64 int64Type = 4;
optional uint32 uint32Type = 5;
optional uint64 uint64Type = 6;
optional sint32 sint32Type = 7;
optional sint64 sint64Type = 8;
optional fixed32 fixed32Type = 9;
optional fixed64 fixed64Type = 10;
optional sfixed32 sfixed32Type = 11;
optional sfixed64 sfixed64Type = 12;
optional bool boolType = 13;
optional string stringType = 14;
optional bytes bytesType = 15 ;
}

@ -0,0 +1,27 @@
syntax = "proto3";
package org.redisson.codec.protobuf.raw;
option java_package = "org.redisson.codec.protobuf.raw";
option java_outer_classname = "Proto3AllTypes";
message AllTypes3{
//types from https://protobuf.dev/programming-guides/proto3/
repeated double doubleType = 1;
optional float floatType = 2;
int32 int32Type = 3;
int64 int64Type = 4;
uint32 uint32Type = 5;
uint64 uint64Type = 6;
sint32 sint32Type = 7;
sint64 sint64Type = 8;
fixed32 fixed32Type = 9;
fixed64 fixed64Type = 10;
sfixed32 sfixed32Type = 11;
sfixed64 sfixed64Type = 12;
bool boolType = 13;
string stringType = 14;
bytes bytesType = 15 ;
}

@ -0,0 +1,51 @@
package org.redisson.codec.protobuf.protostuffData;
import java.util.List;
import java.util.Objects;
public class StuffData {
private String name;
private int age;
private List<String> hobbies;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public List<String> getHobbies() {
return hobbies;
}
public void setHobbies(List<String> hobbies) {
this.hobbies = hobbies;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StuffData stuffData = (StuffData) o;
if (age != stuffData.age) return false;
if (!Objects.equals(name, stuffData.name)) return false;
return Objects.equals(hobbies, stuffData.hobbies);
}
@Override
public int hashCode() {
return Objects.hash(name, age, hobbies);
}
}
Loading…
Cancel
Save