diff --git a/flink-format-changelog-json/pom.xml b/flink-format-changelog-json/pom.xml new file mode 100644 index 000000000..e0de96852 --- /dev/null +++ b/flink-format-changelog-json/pom.xml @@ -0,0 +1,76 @@ + + + + + flink-cdc-connectors + com.alibaba.ververica + 1.0.0-SNAPSHOT + + 4.0.0 + + flink-format-changelog-json + flink-format-changelog-json + + jar + + + + org.apache.flink + flink-json + ${flink.version} + provided + + + + + + + org.apache.flink + flink-table-common + ${flink.version} + test + test-jar + + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} + ${flink.version} + test + + + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} + ${flink.version} + test-jar + test + + + \ No newline at end of file diff --git a/flink-format-changelog-json/src/main/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonDeserializationSchema.java b/flink-format-changelog-json/src/main/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonDeserializationSchema.java new file mode 100644 index 000000000..5f92968bb --- /dev/null +++ b/flink-format-changelog-json/src/main/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonDeserializationSchema.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.formats.json.TimestampFormat; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.Objects; + +import static java.lang.String.format; +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; + + +/** + * Deserialization schema from Changelog Json to Flink Table/SQL internal data structure {@link RowData}. + */ +public class ChangelogJsonDeserializationSchema implements DeserializationSchema { + private static final long serialVersionUID = -2084214292622004460L; + + /** The deserializer to deserialize Debezium JSON data. */ + private final JsonRowDataDeserializationSchema jsonDeserializer; + + /** TypeInformation of the produced {@link RowData}. **/ + private final TypeInformation resultTypeInfo; + + /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + private final boolean ignoreParseErrors; + + public ChangelogJsonDeserializationSchema( + RowType rowType, + TypeInformation resultTypeInfo, + boolean ignoreParseErrors, + TimestampFormat timestampFormatOption) { + this.resultTypeInfo = resultTypeInfo; + this.ignoreParseErrors = ignoreParseErrors; + this.jsonDeserializer = new JsonRowDataDeserializationSchema( + createJsonRowType(fromLogicalToDataType(rowType)), + // the result type is never used, so it's fine to pass in Debezium's result type + resultTypeInfo, + false, // ignoreParseErrors already contains the functionality of failOnMissingField + ignoreParseErrors, + timestampFormatOption); + } + + @Override + public RowData deserialize(byte[] message) throws IOException { + throw new RuntimeException( + "Please invoke DeserializationSchema#deserialize(byte[], Collector) instead."); + } + + @Override + public void deserialize(byte[] bytes, Collector out) throws IOException { + try { + GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(bytes); + GenericRowData data = (GenericRowData) row.getField(0); + String op = row.getString(1).toString(); + RowKind rowKind = parseRowKind(op); + data.setRowKind(rowKind); + out.collect(data); + } catch (Throwable t) { + // a big try catch to protect the processing. + if (!ignoreParseErrors) { + throw new IOException(format( + "Corrupt Debezium JSON message '%s'.", new String(bytes)), t); + } + } + } + + private static RowKind parseRowKind(String op) { + switch (op) { + case "+I": + return RowKind.INSERT; + case "-U": + return RowKind.UPDATE_BEFORE; + case "+U": + return RowKind.UPDATE_AFTER; + case "-D": + return RowKind.DELETE; + default: + throw new UnsupportedOperationException("Unsupported operation '" + op + "' for row kind."); + } + } + + @Override + public boolean isEndOfStream(RowData rowData) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return resultTypeInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ChangelogJsonDeserializationSchema that = (ChangelogJsonDeserializationSchema) o; + return ignoreParseErrors == that.ignoreParseErrors && + Objects.equals(jsonDeserializer, that.jsonDeserializer) && + Objects.equals(resultTypeInfo, that.resultTypeInfo); + } + + @Override + public int hashCode() { + return Objects.hash(jsonDeserializer, resultTypeInfo, ignoreParseErrors); + } + + private static RowType createJsonRowType(DataType databaseSchema) { + DataType payload = DataTypes.ROW( + DataTypes.FIELD("data", databaseSchema), + DataTypes.FIELD("op", DataTypes.STRING())); + return (RowType) payload.getLogicalType(); + } +} diff --git a/flink-format-changelog-json/src/main/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonFormatFactory.java b/flink-format-changelog-json/src/main/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonFormatFactory.java new file mode 100644 index 000000000..828786cc8 --- /dev/null +++ b/flink-format-changelog-json/src/main/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonFormatFactory.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.json.JsonOptions; +import org.apache.flink.formats.json.TimestampFormat; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Format factory for providing configured instances of Debezium JSON to RowData {@link DeserializationSchema}. + */ +public class ChangelogJsonFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { + + public static final String IDENTIFIER = "changelog-json"; + + public static final ConfigOption IGNORE_PARSE_ERRORS = JsonOptions.IGNORE_PARSE_ERRORS; + + public static final ConfigOption TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT; + + @Override + public DecodingFormat> createDecodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); + TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions); + + return new DecodingFormat>() { + @SuppressWarnings("unchecked") + @Override + public DeserializationSchema createRuntimeDecoder( + DynamicTableSource.Context context, DataType producedDataType) { + final RowType rowType = (RowType) producedDataType.getLogicalType(); + final TypeInformation rowDataTypeInfo = + (TypeInformation) context.createTypeInformation(producedDataType); + return new ChangelogJsonDeserializationSchema( + rowType, + rowDataTypeInfo, + ignoreParseErrors, + timestampFormat); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + }; + } + + @Override + public EncodingFormat> createEncodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions); + + return new EncodingFormat>() { + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + @Override + public SerializationSchema createRuntimeEncoder(DynamicTableSink.Context context, DataType consumedDataType) { + final RowType rowType = (RowType) consumedDataType.getLogicalType(); + return new ChangelogJsonSerializationSchema( + rowType, + timestampFormat + ); + } + }; + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(IGNORE_PARSE_ERRORS); + options.add(TIMESTAMP_FORMAT); + return options; + } +} diff --git a/flink-format-changelog-json/src/main/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonSerializationSchema.java b/flink-format-changelog-json/src/main/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonSerializationSchema.java new file mode 100644 index 000000000..a54064db7 --- /dev/null +++ b/flink-format-changelog-json/src/main/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonSerializationSchema.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.formats.json; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.json.JsonRowDataSerializationSchema; +import org.apache.flink.formats.json.TimestampFormat; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import java.util.Objects; + +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; + +/** + * Serialization schema from Flink Table/SQL internal data structure {@link RowData} to Changelog Json. + */ +public class ChangelogJsonSerializationSchema implements SerializationSchema { + private static final long serialVersionUID = -3999450457829887684L; + + private static final StringData OP_INSERT = StringData.fromString("+I"); + private static final StringData OP_UPDATE_BEFORE = StringData.fromString("-U"); + private static final StringData OP_UPDATE_AFTER = StringData.fromString("+U"); + private static final StringData OP_DELETE = StringData.fromString("-D"); + + private final JsonRowDataSerializationSchema jsonSerializer; + + /** Timestamp format specification which is used to parse timestamp. */ + private final TimestampFormat timestampFormat; + + private transient GenericRowData reuse; + + public ChangelogJsonSerializationSchema(RowType rowType, TimestampFormat timestampFormat) { + this.jsonSerializer = new JsonRowDataSerializationSchema( + createJsonRowType(fromLogicalToDataType(rowType)), + timestampFormat); + this.timestampFormat = timestampFormat; + } + + @Override + public void open(InitializationContext context) throws Exception { + this.reuse = new GenericRowData(2); + } + + @Override + public byte[] serialize(RowData rowData) { + reuse.setField(0, rowData); + reuse.setField(1, stringifyRowKind(rowData.getRowKind())); + return jsonSerializer.serialize(reuse); + } + + private static StringData stringifyRowKind(RowKind rowKind) { + switch (rowKind) { + case INSERT: + return OP_INSERT; + case UPDATE_BEFORE: + return OP_UPDATE_BEFORE; + case UPDATE_AFTER: + return OP_UPDATE_AFTER; + case DELETE: + return OP_DELETE; + default: + throw new UnsupportedOperationException("Unsupported operation '" + rowKind + "' for row kind."); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ChangelogJsonSerializationSchema that = (ChangelogJsonSerializationSchema) o; + return Objects.equals(jsonSerializer, that.jsonSerializer) && + timestampFormat == that.timestampFormat; + } + + @Override + public int hashCode() { + return Objects.hash(jsonSerializer, timestampFormat); + } + + private static RowType createJsonRowType(DataType databaseSchema) { + DataType payload = DataTypes.ROW( + DataTypes.FIELD("data", databaseSchema), + DataTypes.FIELD("op", DataTypes.STRING())); + return (RowType) payload.getLogicalType(); + } +} diff --git a/flink-format-changelog-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-format-changelog-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..cc04de304 --- /dev/null +++ b/flink-format-changelog-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory diff --git a/flink-format-changelog-json/src/test/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonFormatFactoryTest.java b/flink-format-changelog-json/src/test/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonFormatFactoryTest.java new file mode 100644 index 000000000..b4891c879 --- /dev/null +++ b/flink-format-changelog-json/src/test/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonFormatFactoryTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.formats.json.TimestampFormat; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.TestDynamicTableFactory; +import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link ChangelogJsonFormatFactoryTest}. + */ +public class ChangelogJsonFormatFactoryTest extends TestLogger { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private static final TableSchema SCHEMA = TableSchema.builder() + .field("a", DataTypes.STRING()) + .field("b", DataTypes.INT()) + .field("c", DataTypes.BOOLEAN()) + .build(); + + private static final RowType ROW_TYPE = (RowType) SCHEMA.toRowDataType().getLogicalType(); + + @Test + public void testSeDeSchema() { + final ChangelogJsonDeserializationSchema expectedDeser = new ChangelogJsonDeserializationSchema( + ROW_TYPE, + RowDataTypeInfo.of(ROW_TYPE), + true, + TimestampFormat.ISO_8601); + final ChangelogJsonSerializationSchema expectedSer = new ChangelogJsonSerializationSchema( + ROW_TYPE, + TimestampFormat.ISO_8601); + + final Map options = getAllOptions(); + + final DynamicTableSource actualSource = createTableSource(options); + assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock; + TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock = + (TestDynamicTableFactory.DynamicTableSourceMock) actualSource; + + DeserializationSchema actualDeser = scanSourceMock.valueFormat + .createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, + SCHEMA.toRowDataType()); + + assertEquals(expectedDeser, actualDeser); + + final DynamicTableSink actualSink = createTableSink(options); + assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock; + TestDynamicTableFactory.DynamicTableSinkMock sinkMock = (TestDynamicTableFactory.DynamicTableSinkMock) actualSink; + + SerializationSchema actualSer = sinkMock.valueFormat + .createRuntimeEncoder( + new SinkRuntimeProviderContext(false), + SCHEMA.toRowDataType()); + + assertEquals(expectedSer, actualSer); + } + + @Test + public void testInvalidIgnoreParseError() { + final Map options = + getModifiedOptions(opts -> opts.put("changelog-json.ignore-parse-errors", "abc")); + + try { + createTableSource(options); + } catch (Exception e) { + assertTrue(ExceptionUtils.findThrowableWithMessage( + e, + "Unrecognized option for boolean: abc. Expected either true or false(case insensitive)").isPresent()); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Returns the full options modified by the given consumer {@code optionModifier}. + * + * @param optionModifier Consumer to modify the options + */ + private Map getModifiedOptions(Consumer> optionModifier) { + Map options = getAllOptions(); + optionModifier.accept(options); + return options; + } + + private Map getAllOptions() { + final Map options = new HashMap<>(); + options.put("connector", TestDynamicTableFactory.IDENTIFIER); + options.put("target", "MyTarget"); + options.put("buffer-size", "1000"); + + options.put("format", "changelog-json"); + options.put("changelog-json.ignore-parse-errors", "true"); + options.put("changelog-json.timestamp-format.standard", "ISO-8601"); + return options; + } + + private static DynamicTableSource createTableSource(Map options) { + return FactoryUtil.createTableSource( + null, + ObjectIdentifier.of("default", "default", "t1"), + new CatalogTableImpl(SCHEMA, options, "mock source"), + new Configuration(), + ChangelogJsonFormatFactoryTest.class.getClassLoader()); + } + + private static DynamicTableSink createTableSink(Map options) { + return FactoryUtil.createTableSink( + null, + ObjectIdentifier.of("default", "default", "t1"), + new CatalogTableImpl(SCHEMA, options, "mock sink"), + new Configuration(), + ChangelogJsonFormatFactoryTest.class.getClassLoader()); + } +} diff --git a/flink-format-changelog-json/src/test/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonSerDeTest.java b/flink-format-changelog-json/src/test/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonSerDeTest.java new file mode 100644 index 000000000..49790a33e --- /dev/null +++ b/flink-format-changelog-json/src/test/java/com/alibaba/ververica/cdc/formats/json/ChangelogJsonSerDeTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.formats.json; + +import org.apache.flink.formats.json.TimestampFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.FLOAT; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link ChangelogJsonSerializationSchema} and {@link ChangelogJsonDeserializationSchema}. + */ +public class ChangelogJsonSerDeTest { + + private static final RowType SCHEMA = (RowType) ROW( + FIELD("id", INT().notNull()), + FIELD("name", STRING()), + FIELD("description", STRING()), + FIELD("weight", FLOAT()) + ).getLogicalType(); + + @Test + public void testSerializationDeserialization() throws Exception { + List lines = readLines("changelog-json-data.txt"); + ChangelogJsonDeserializationSchema deserializationSchema = new ChangelogJsonDeserializationSchema( + SCHEMA, + RowDataTypeInfo.of(SCHEMA), + false, + TimestampFormat.SQL); + + deserializationSchema.open(null); + SimpleCollector collector = new SimpleCollector(); + for (String line : lines) { + deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector); + } + + // CREATE TABLE product ( + // id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + // name VARCHAR(255), + // description VARCHAR(512), + // weight FLOAT + // ); + // ALTER TABLE product AUTO_INCREMENT = 101; + // + // INSERT INTO product + // VALUES (default,"scooter","Small 2-wheel scooter",3.14), + // (default,"car battery","12V car battery",8.1), + // (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + // (default,"hammer","12oz carpenter's hammer",0.75), + // (default,"hammer","14oz carpenter's hammer",0.875), + // (default,"hammer","16oz carpenter's hammer",1.0), + // (default,"rocks","box of assorted rocks",5.3), + // (default,"jacket","water resistent black wind breaker",0.1), + // (default,"spare tire","24 inch spare tire",22.2); + // UPDATE product SET description='18oz carpenter hammer' WHERE id=106; + // UPDATE product SET weight='5.1' WHERE id=107; + // INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2); + // INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18); + // UPDATE product SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110; + // UPDATE product SET weight='5.17' WHERE id=111; + // DELETE FROM product WHERE id=111; + List expected = Arrays.asList( + "+I(101,scooter,Small 2-wheel scooter,3.14)", + "+I(102,car battery,12V car battery,8.1)", + "+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8)", + "+I(104,hammer,12oz carpenter's hammer,0.75)", + "+I(105,hammer,14oz carpenter's hammer,0.875)", + "+I(106,hammer,16oz carpenter's hammer,1.0)", + "+I(107,rocks,box of assorted rocks,5.3)", + "+I(108,jacket,water resistent black wind breaker,0.1)", + "+I(109,spare tire,24 inch spare tire,22.2)", + "-U(106,hammer,16oz carpenter's hammer,1.0)", + "+U(106,hammer,18oz carpenter hammer,1.0)", + "-U(107,rocks,box of assorted rocks,5.3)", + "+U(107,rocks,box of assorted rocks,5.1)", + "+I(110,jacket,water resistent white wind breaker,0.2)", + "+I(111,scooter,Big 2-wheel scooter ,5.18)", + "-U(110,jacket,water resistent white wind breaker,0.2)", + "+U(110,jacket,new water resistent white wind breaker,0.5)", + "-U(111,scooter,Big 2-wheel scooter ,5.18)", + "+U(111,scooter,Big 2-wheel scooter ,5.17)", + "-D(111,scooter,Big 2-wheel scooter ,5.17)" + ); + List actual = collector.list.stream() + .map(Object::toString) + .collect(Collectors.toList()); + assertEquals(expected, actual); + + ChangelogJsonSerializationSchema serializationSchema = new ChangelogJsonSerializationSchema( + SCHEMA, + TimestampFormat.SQL); + serializationSchema.open(null); + List result = new ArrayList<>(); + for (RowData rowData : collector.list) { + result.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8)); + } + List expectedResult = Arrays.asList( + "{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op\":\"+I\"}", + "{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"+I\"}", + "{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op\":\"+I\"}", + "{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op\":\"+I\"}", + "{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op\":\"+I\"}", + "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"+I\"}", + "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"+I\"}", + "{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op\":\"+I\"}", + "{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"+I\"}", + "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"-U\"}", + "{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op\":\"+U\"}", + "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"-U\"}", + "{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op\":\"+U\"}", + "{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"+I\"}", + "{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"+I\"}", + "{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"-U\"}", + "{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op\":\"+U\"}", + "{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"-U\"}", + "{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"+U\"}", + "{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"-D\"}" + ); + assertEquals(expectedResult, result); + } + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + private static List readLines(String resource) throws IOException { + final URL url = ChangelogJsonSerDeTest.class.getClassLoader().getResource(resource); + assert url != null; + Path path = new File(url.getFile()).toPath(); + return Files.readAllLines(path); + } + + private static class SimpleCollector implements Collector { + + private List list = new ArrayList<>(); + + @Override + public void collect(RowData record) { + list.add(record); + } + + @Override + public void close() { + // do nothing + } + } +} diff --git a/flink-format-changelog-json/src/test/resources/changelog-json-data.txt b/flink-format-changelog-json/src/test/resources/changelog-json-data.txt new file mode 100644 index 000000000..341bee6f1 --- /dev/null +++ b/flink-format-changelog-json/src/test/resources/changelog-json-data.txt @@ -0,0 +1,20 @@ +{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op":"+I"} +{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"op":"+I"} +{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"op":"+I"} +{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"op":"+I"} +{"data":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"op":"+I"} +{"data":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op":"+I"} +{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op":"+I"} +{"data":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"op":"+I"} +{"data":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"op":"+I"} +{"data":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op":"-U"} +{"data":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"op":"+U"} +{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op":"-U"} +{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"op":"+U"} +{"data":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"op":"+I"} +{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op":"+I"} +{"data":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"op":"-U"} +{"data":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"op":"+U"} +{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op":"-U"} +{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op":"+U"} +{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op":"-D"} \ No newline at end of file diff --git a/pom.xml b/pom.xml index fd5686af4..c99a94c83 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ under the License. flink-connector-postgres-cdc flink-sql-connector-mysql-cdc flink-sql-connector-postgres-cdc + flink-format-changelog-json @@ -175,6 +176,8 @@ under the License. **/*.iml **/*.md + + **/*.txt