[changelog-json] Support changelog json to encode and decode changelogs

release-1.0
Jark Wu 5 years ago
parent 97b95eda77
commit d70cbd61f0
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.alibaba.ververica</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-format-changelog-json</artifactId>
<name>flink-format-changelog-json</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<!-- CSV table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- CSV RowData (de)serialization schema testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- CSV rowData encoder/intputformat testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.Objects;
import static java.lang.String.format;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/**
* Deserialization schema from Changelog Json to Flink Table/SQL internal data structure {@link RowData}.
*/
public class ChangelogJsonDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = -2084214292622004460L;
/** The deserializer to deserialize Debezium JSON data. */
private final JsonRowDataDeserializationSchema jsonDeserializer;
/** TypeInformation of the produced {@link RowData}. **/
private final TypeInformation<RowData> resultTypeInfo;
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
private final boolean ignoreParseErrors;
public ChangelogJsonDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean ignoreParseErrors,
TimestampFormat timestampFormatOption) {
this.resultTypeInfo = resultTypeInfo;
this.ignoreParseErrors = ignoreParseErrors;
this.jsonDeserializer = new JsonRowDataDeserializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
// the result type is never used, so it's fine to pass in Debezium's result type
resultTypeInfo,
false, // ignoreParseErrors already contains the functionality of failOnMissingField
ignoreParseErrors,
timestampFormatOption);
}
@Override
public RowData deserialize(byte[] message) throws IOException {
throw new RuntimeException(
"Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
}
@Override
public void deserialize(byte[] bytes, Collector<RowData> out) throws IOException {
try {
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(bytes);
GenericRowData data = (GenericRowData) row.getField(0);
String op = row.getString(1).toString();
RowKind rowKind = parseRowKind(op);
data.setRowKind(rowKind);
out.collect(data);
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(format(
"Corrupt Debezium JSON message '%s'.", new String(bytes)), t);
}
}
}
private static RowKind parseRowKind(String op) {
switch (op) {
case "+I":
return RowKind.INSERT;
case "-U":
return RowKind.UPDATE_BEFORE;
case "+U":
return RowKind.UPDATE_AFTER;
case "-D":
return RowKind.DELETE;
default:
throw new UnsupportedOperationException("Unsupported operation '" + op + "' for row kind.");
}
}
@Override
public boolean isEndOfStream(RowData rowData) {
return false;
}
@Override
public TypeInformation<RowData> getProducedType() {
return resultTypeInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChangelogJsonDeserializationSchema that = (ChangelogJsonDeserializationSchema) o;
return ignoreParseErrors == that.ignoreParseErrors &&
Objects.equals(jsonDeserializer, that.jsonDeserializer) &&
Objects.equals(resultTypeInfo, that.resultTypeInfo);
}
@Override
public int hashCode() {
return Objects.hash(jsonDeserializer, resultTypeInfo, ignoreParseErrors);
}
private static RowType createJsonRowType(DataType databaseSchema) {
DataType payload = DataTypes.ROW(
DataTypes.FIELD("data", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()));
return (RowType) payload.getLogicalType();
}
}

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* Format factory for providing configured instances of Debezium JSON to RowData {@link DeserializationSchema}.
*/
public class ChangelogJsonFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
public static final String IDENTIFIER = "changelog-json";
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = JsonOptions.IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
return new DecodingFormat<DeserializationSchema<RowData>>() {
@SuppressWarnings("unchecked")
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
return new ChangelogJsonDeserializationSchema(
rowType,
rowDataTypeInfo,
ignoreParseErrors,
timestampFormat);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
};
}
@Override
public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
@Override
public SerializationSchema<RowData> createRuntimeEncoder(DynamicTableSink.Context context, DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new ChangelogJsonSerializationSchema(
rowType,
timestampFormat
);
}
};
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IGNORE_PARSE_ERRORS);
options.add(TIMESTAMP_FORMAT);
return options;
}
}

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.util.Objects;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/**
* Serialization schema from Flink Table/SQL internal data structure {@link RowData} to Changelog Json.
*/
public class ChangelogJsonSerializationSchema implements SerializationSchema<RowData> {
private static final long serialVersionUID = -3999450457829887684L;
private static final StringData OP_INSERT = StringData.fromString("+I");
private static final StringData OP_UPDATE_BEFORE = StringData.fromString("-U");
private static final StringData OP_UPDATE_AFTER = StringData.fromString("+U");
private static final StringData OP_DELETE = StringData.fromString("-D");
private final JsonRowDataSerializationSchema jsonSerializer;
/** Timestamp format specification which is used to parse timestamp. */
private final TimestampFormat timestampFormat;
private transient GenericRowData reuse;
public ChangelogJsonSerializationSchema(RowType rowType, TimestampFormat timestampFormat) {
this.jsonSerializer = new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat);
this.timestampFormat = timestampFormat;
}
@Override
public void open(InitializationContext context) throws Exception {
this.reuse = new GenericRowData(2);
}
@Override
public byte[] serialize(RowData rowData) {
reuse.setField(0, rowData);
reuse.setField(1, stringifyRowKind(rowData.getRowKind()));
return jsonSerializer.serialize(reuse);
}
private static StringData stringifyRowKind(RowKind rowKind) {
switch (rowKind) {
case INSERT:
return OP_INSERT;
case UPDATE_BEFORE:
return OP_UPDATE_BEFORE;
case UPDATE_AFTER:
return OP_UPDATE_AFTER;
case DELETE:
return OP_DELETE;
default:
throw new UnsupportedOperationException("Unsupported operation '" + rowKind + "' for row kind.");
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChangelogJsonSerializationSchema that = (ChangelogJsonSerializationSchema) o;
return Objects.equals(jsonSerializer, that.jsonSerializer) &&
timestampFormat == that.timestampFormat;
}
@Override
public int hashCode() {
return Objects.hash(jsonSerializer, timestampFormat);
}
private static RowType createJsonRowType(DataType databaseSchema) {
DataType payload = DataTypes.ROW(
DataTypes.FIELD("data", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()));
return (RowType) payload.getLogicalType();
}
}

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests for {@link ChangelogJsonFormatFactoryTest}.
*/
public class ChangelogJsonFormatFactoryTest extends TestLogger {
@Rule
public ExpectedException thrown = ExpectedException.none();
private static final TableSchema SCHEMA = TableSchema.builder()
.field("a", DataTypes.STRING())
.field("b", DataTypes.INT())
.field("c", DataTypes.BOOLEAN())
.build();
private static final RowType ROW_TYPE = (RowType) SCHEMA.toRowDataType().getLogicalType();
@Test
public void testSeDeSchema() {
final ChangelogJsonDeserializationSchema expectedDeser = new ChangelogJsonDeserializationSchema(
ROW_TYPE,
RowDataTypeInfo.of(ROW_TYPE),
true,
TimestampFormat.ISO_8601);
final ChangelogJsonSerializationSchema expectedSer = new ChangelogJsonSerializationSchema(
ROW_TYPE,
TimestampFormat.ISO_8601);
final Map<String, String> options = getAllOptions();
final DynamicTableSource actualSource = createTableSource(options);
assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
DeserializationSchema<RowData> actualDeser = scanSourceMock.valueFormat
.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE,
SCHEMA.toRowDataType());
assertEquals(expectedDeser, actualDeser);
final DynamicTableSink actualSink = createTableSink(options);
assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
TestDynamicTableFactory.DynamicTableSinkMock sinkMock = (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
SerializationSchema<RowData> actualSer = sinkMock.valueFormat
.createRuntimeEncoder(
new SinkRuntimeProviderContext(false),
SCHEMA.toRowDataType());
assertEquals(expectedSer, actualSer);
}
@Test
public void testInvalidIgnoreParseError() {
final Map<String, String> options =
getModifiedOptions(opts -> opts.put("changelog-json.ignore-parse-errors", "abc"));
try {
createTableSource(options);
} catch (Exception e) {
assertTrue(ExceptionUtils.findThrowableWithMessage(
e,
"Unrecognized option for boolean: abc. Expected either true or false(case insensitive)").isPresent());
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
/**
* Returns the full options modified by the given consumer {@code optionModifier}.
*
* @param optionModifier Consumer to modify the options
*/
private Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
Map<String, String> options = getAllOptions();
optionModifier.accept(options);
return options;
}
private Map<String, String> getAllOptions() {
final Map<String, String> options = new HashMap<>();
options.put("connector", TestDynamicTableFactory.IDENTIFIER);
options.put("target", "MyTarget");
options.put("buffer-size", "1000");
options.put("format", "changelog-json");
options.put("changelog-json.ignore-parse-errors", "true");
options.put("changelog-json.timestamp-format.standard", "ISO-8601");
return options;
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock source"),
new Configuration(),
ChangelogJsonFormatFactoryTest.class.getClassLoader());
}
private static DynamicTableSink createTableSink(Map<String, String> options) {
return FactoryUtil.createTableSink(
null,
ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock sink"),
new Configuration(),
ChangelogJsonFormatFactoryTest.class.getClassLoader());
}
}

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.FLOAT;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.junit.Assert.assertEquals;
/**
* Tests for {@link ChangelogJsonSerializationSchema} and {@link ChangelogJsonDeserializationSchema}.
*/
public class ChangelogJsonSerDeTest {
private static final RowType SCHEMA = (RowType) ROW(
FIELD("id", INT().notNull()),
FIELD("name", STRING()),
FIELD("description", STRING()),
FIELD("weight", FLOAT())
).getLogicalType();
@Test
public void testSerializationDeserialization() throws Exception {
List<String> lines = readLines("changelog-json-data.txt");
ChangelogJsonDeserializationSchema deserializationSchema = new ChangelogJsonDeserializationSchema(
SCHEMA,
RowDataTypeInfo.of(SCHEMA),
false,
TimestampFormat.SQL);
deserializationSchema.open(null);
SimpleCollector collector = new SimpleCollector();
for (String line : lines) {
deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
}
// CREATE TABLE product (
// id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
// name VARCHAR(255),
// description VARCHAR(512),
// weight FLOAT
// );
// ALTER TABLE product AUTO_INCREMENT = 101;
//
// INSERT INTO product
// VALUES (default,"scooter","Small 2-wheel scooter",3.14),
// (default,"car battery","12V car battery",8.1),
// (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
// (default,"hammer","12oz carpenter's hammer",0.75),
// (default,"hammer","14oz carpenter's hammer",0.875),
// (default,"hammer","16oz carpenter's hammer",1.0),
// (default,"rocks","box of assorted rocks",5.3),
// (default,"jacket","water resistent black wind breaker",0.1),
// (default,"spare tire","24 inch spare tire",22.2);
// UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
// UPDATE product SET weight='5.1' WHERE id=107;
// INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
// INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
// UPDATE product SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;
// UPDATE product SET weight='5.17' WHERE id=111;
// DELETE FROM product WHERE id=111;
List<String> expected = Arrays.asList(
"+I(101,scooter,Small 2-wheel scooter,3.14)",
"+I(102,car battery,12V car battery,8.1)",
"+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
"+I(104,hammer,12oz carpenter's hammer,0.75)",
"+I(105,hammer,14oz carpenter's hammer,0.875)",
"+I(106,hammer,16oz carpenter's hammer,1.0)",
"+I(107,rocks,box of assorted rocks,5.3)",
"+I(108,jacket,water resistent black wind breaker,0.1)",
"+I(109,spare tire,24 inch spare tire,22.2)",
"-U(106,hammer,16oz carpenter's hammer,1.0)",
"+U(106,hammer,18oz carpenter hammer,1.0)",
"-U(107,rocks,box of assorted rocks,5.3)",
"+U(107,rocks,box of assorted rocks,5.1)",
"+I(110,jacket,water resistent white wind breaker,0.2)",
"+I(111,scooter,Big 2-wheel scooter ,5.18)",
"-U(110,jacket,water resistent white wind breaker,0.2)",
"+U(110,jacket,new water resistent white wind breaker,0.5)",
"-U(111,scooter,Big 2-wheel scooter ,5.18)",
"+U(111,scooter,Big 2-wheel scooter ,5.17)",
"-D(111,scooter,Big 2-wheel scooter ,5.17)"
);
List<String> actual = collector.list.stream()
.map(Object::toString)
.collect(Collectors.toList());
assertEquals(expected, actual);
ChangelogJsonSerializationSchema serializationSchema = new ChangelogJsonSerializationSchema(
SCHEMA,
TimestampFormat.SQL);
serializationSchema.open(null);
List<String> result = new ArrayList<>();
for (RowData rowData : collector.list) {
result.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8));
}
List<String> expectedResult = Arrays.asList(
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op\":\"+I\"}",
"{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"+I\"}",
"{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op\":\"+I\"}",
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op\":\"+I\"}",
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op\":\"+I\"}",
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"+I\"}",
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"+I\"}",
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op\":\"+I\"}",
"{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"+I\"}",
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"-U\"}",
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op\":\"+U\"}",
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"-U\"}",
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op\":\"+U\"}",
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"+I\"}",
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"+I\"}",
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"-U\"}",
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op\":\"+U\"}",
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"-U\"}",
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"+U\"}",
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"-D\"}"
);
assertEquals(expectedResult, result);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
private static List<String> readLines(String resource) throws IOException {
final URL url = ChangelogJsonSerDeTest.class.getClassLoader().getResource(resource);
assert url != null;
Path path = new File(url.getFile()).toPath();
return Files.readAllLines(path);
}
private static class SimpleCollector implements Collector<RowData> {
private List<RowData> list = new ArrayList<>();
@Override
public void collect(RowData record) {
list.add(record);
}
@Override
public void close() {
// do nothing
}
}
}

@ -0,0 +1,20 @@
{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op":"+I"}
{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"op":"+I"}
{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"op":"+I"}
{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"op":"+I"}
{"data":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"op":"+I"}
{"data":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op":"+I"}
{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op":"+I"}
{"data":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"op":"+I"}
{"data":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"op":"+I"}
{"data":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op":"-U"}
{"data":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"op":"+U"}
{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op":"-U"}
{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"op":"+U"}
{"data":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"op":"+I"}
{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op":"+I"}
{"data":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"op":"-U"}
{"data":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"op":"+U"}
{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op":"-U"}
{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op":"+U"}
{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op":"-D"}

@ -39,6 +39,7 @@ under the License.
<module>flink-connector-postgres-cdc</module>
<module>flink-sql-connector-mysql-cdc</module>
<module>flink-sql-connector-postgres-cdc</module>
<module>flink-format-changelog-json</module>
</modules>
<licenses>
@ -175,6 +176,8 @@ under the License.
<exclude>**/*.iml</exclude>
<!-- Docs -->
<exclude>**/*.md</exclude>
<!-- Tests -->
<exclude>**/*.txt</exclude>
</excludes>
</configuration>
</plugin>

Loading…
Cancel
Save