[format][changelog-json] Remove the support of changelog-json format (#912)

pull/780/head
Leonard Xu 3 years ago committed by GitHub
parent c77275cb68
commit 3f26906425
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -15,12 +15,6 @@ The Flink CDC Connectors integrates Debezium as the engine to capture data chang
| Oracle | Database: 11, 12, 19 <br/>Oracle Driver: 19.3.0.0|
| Sqlserver | Database: 2017, 2019 <br/>JDBC Driver: 7.2.2.jre8|
## Supported Formats
| Format | Supported Connector | Flink Version |
| --- | --- | --- |
| <a href="https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format">Changelog Json</a> | <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/connectors/kafka.html">Apache Kafka</a> | 1.11+ |
## Supported Flink Versions
The following table shows the version mapping between Flink CDC Connectors and Flink:

@ -1,119 +1,8 @@
# Changelog JSON Format
Flink supports to emit changelogs in JSON format and interpret the output back again.
**WARNING:** The CDC format `changelog-json` is deprecated since Flink CDC version 2.2.
The CDC format `changelog-json` was introduced at the point that Flink didn't offer any CDC format. Currently, Flink offers several well-maintained CDC formats i.e.[Debezium CDC, MAXWELL CDC, CANAL CDC](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/overview/), we recommend user to use above CDC formats.
Dependencies
------------
In order to setup the Changelog JSON format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
### Maven dependency
```
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-format-changelog-json</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>2.2-SNAPSHOT</version>
</dependency>
```
### SQL Client JAR
```Download link is available only for stable releases.```
Download [flink-format-changelog-json-2.2-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-format-changelog-json/2.2-SNAPSHOT/flink-format-changelog-json-2.2-SNAPSHOT.jar) and put it under `<FLINK_HOME>/lib/`.
How to use Changelog JSON format
----------------
```sql
-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka', -- using kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'json' -- the data format is json
);
-- we want to store the the UV aggregation result in kafka using changelog-json format
create table day_uv (
day_str STRING,
uv BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'day_uv',
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'changelog-json' -- the data format is changelog-json
);
-- write the UV results into kafka using changelog-json format
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');
-- reading the changelog back again
SELECT * FROM day_uv;
```
Format Options
----------------
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-center" style="width: 8%">Required</th>
<th class="text-center" style="width: 7%">Default</th>
<th class="text-center" style="width: 10%">Type</th>
<th class="text-center" style="width: 50%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>format</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what format to use, here should be 'changelog-json'.</td>
</tr>
<tr>
<td>changelog-json.ignore-parse-errors</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
<tr>
<td>changelog-json.timestamp-format.standard</td>
<td>optional</td>
<td style="word-wrap: break-word;">'SQL'</td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are 'SQL' and 'ISO-8601':
<ul>
<li>Option 'SQL' will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option 'ISO-8601'will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
</tbody>
</table>
</div>
Data Type Mapping
----------------
Currently, the Canal format uses JSON format for deserialization. Please refer to [JSON format documentation](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#data-type-mapping) for more details about the data type mapping.
### Compatibility Note
User can still obtain and use the deprecated `changelog-json` format from older Flink CDC version e.g. [flink-format-changelog-json-2.1.1.jar](https://repo1.maven.org/maven2/com/ververica/flink-format-changelog-json/2.1.1/flink-format-changelog-json-2.1.1-SNAPSHOT.jar).

@ -1,76 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.ververica</groupId>
<version>2.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-format-changelog-json</artifactId>
<name>flink-format-changelog-json</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<!-- CSV table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- CSV RowData (de)serialization schema testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- CSV rowData encoder/intputformat testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -1,150 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.Objects;
import static java.lang.String.format;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/**
* Deserialization schema from Changelog Json to Flink Table/SQL internal data structure {@link
* RowData}.
*/
public class ChangelogJsonDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = -2084214292622004460L;
/** The deserializer to deserialize Debezium JSON data. */
private final JsonRowDataDeserializationSchema jsonDeserializer;
/** TypeInformation of the produced {@link RowData}. * */
private final TypeInformation<RowData> resultTypeInfo;
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
private final boolean ignoreParseErrors;
public ChangelogJsonDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean ignoreParseErrors,
TimestampFormat timestampFormatOption) {
this.resultTypeInfo = resultTypeInfo;
this.ignoreParseErrors = ignoreParseErrors;
this.jsonDeserializer =
new JsonRowDataDeserializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
// the result type is never used, so it's fine to pass in Debezium's result
// type
resultTypeInfo,
false, // ignoreParseErrors already contains the functionality of
// failOnMissingField
ignoreParseErrors,
timestampFormatOption);
}
@Override
public RowData deserialize(byte[] message) throws IOException {
throw new RuntimeException(
"Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
}
@Override
public void deserialize(byte[] bytes, Collector<RowData> out) throws IOException {
try {
GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(bytes);
GenericRowData data = (GenericRowData) row.getField(0);
String op = row.getString(1).toString();
RowKind rowKind = parseRowKind(op);
data.setRowKind(rowKind);
out.collect(data);
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Changelog JSON message '%s'.", new String(bytes)), t);
}
}
}
private static RowKind parseRowKind(String op) {
switch (op) {
case "+I":
return RowKind.INSERT;
case "-U":
return RowKind.UPDATE_BEFORE;
case "+U":
return RowKind.UPDATE_AFTER;
case "-D":
return RowKind.DELETE;
default:
throw new UnsupportedOperationException(
"Unsupported operation '" + op + "' for row kind.");
}
}
@Override
public boolean isEndOfStream(RowData rowData) {
return false;
}
@Override
public TypeInformation<RowData> getProducedType() {
return resultTypeInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChangelogJsonDeserializationSchema that = (ChangelogJsonDeserializationSchema) o;
return ignoreParseErrors == that.ignoreParseErrors
&& Objects.equals(jsonDeserializer, that.jsonDeserializer)
&& Objects.equals(resultTypeInfo, that.resultTypeInfo);
}
@Override
public int hashCode() {
return Objects.hash(jsonDeserializer, resultTypeInfo, ignoreParseErrors);
}
private static RowType createJsonRowType(DataType databaseSchema) {
DataType payload =
DataTypes.ROW(
DataTypes.FIELD("data", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()));
return (RowType) payload.getLogicalType();
}
}

@ -1,133 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* Format factory for providing configured instances of Debezium JSON to RowData {@link
* DeserializationSchema}.
*/
public class ChangelogJsonFormatFactory
implements DeserializationFormatFactory, SerializationFormatFactory {
public static final String IDENTIFIER = "changelog-json";
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = JsonOptions.IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDataType);
return new ChangelogJsonDeserializationSchema(
rowType, rowDataTypeInfo, ignoreParseErrors, timestampFormat);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
};
}
@Override
public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new ChangelogJsonSerializationSchema(rowType, timestampFormat);
}
};
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IGNORE_PARSE_ERRORS);
options.add(TIMESTAMP_FORMAT);
return options;
}
}

@ -1,120 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.util.Objects;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/**
* Serialization schema from Flink Table/SQL internal data structure {@link RowData} to Changelog
* Json.
*/
public class ChangelogJsonSerializationSchema implements SerializationSchema<RowData> {
private static final long serialVersionUID = -3999450457829887684L;
private static final StringData OP_INSERT = StringData.fromString("+I");
private static final StringData OP_UPDATE_BEFORE = StringData.fromString("-U");
private static final StringData OP_UPDATE_AFTER = StringData.fromString("+U");
private static final StringData OP_DELETE = StringData.fromString("-D");
private final JsonRowDataSerializationSchema jsonSerializer;
/** Timestamp format specification which is used to parse timestamp. */
private final TimestampFormat timestampFormat;
private transient GenericRowData reuse;
public ChangelogJsonSerializationSchema(RowType rowType, TimestampFormat timestampFormat) {
this.jsonSerializer =
new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
JsonOptions.MapNullKeyMode.FAIL,
JsonOptions.MAP_NULL_KEY_LITERAL.defaultValue(),
JsonOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER.defaultValue());
this.timestampFormat = timestampFormat;
}
@Override
public void open(InitializationContext context) throws Exception {
this.reuse = new GenericRowData(2);
}
@Override
public byte[] serialize(RowData rowData) {
reuse.setField(0, rowData);
reuse.setField(1, stringifyRowKind(rowData.getRowKind()));
return jsonSerializer.serialize(reuse);
}
private static StringData stringifyRowKind(RowKind rowKind) {
switch (rowKind) {
case INSERT:
return OP_INSERT;
case UPDATE_BEFORE:
return OP_UPDATE_BEFORE;
case UPDATE_AFTER:
return OP_UPDATE_AFTER;
case DELETE:
return OP_DELETE;
default:
throw new UnsupportedOperationException(
"Unsupported operation '" + rowKind + "' for row kind.");
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChangelogJsonSerializationSchema that = (ChangelogJsonSerializationSchema) o;
return Objects.equals(jsonSerializer, that.jsonSerializer)
&& timestampFormat == that.timestampFormat;
}
@Override
public int hashCode() {
return Objects.hash(jsonSerializer, timestampFormat);
}
private static RowType createJsonRowType(DataType databaseSchema) {
DataType payload =
DataTypes.ROW(
DataTypes.FIELD("data", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()));
return (RowType) payload.getLogicalType();
}
}

@ -1,16 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.ververica.cdc.formats.json.ChangelogJsonFormatFactory

@ -1,179 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Tests for {@link ChangelogJsonFormatFactoryTest}. */
public class ChangelogJsonFormatFactoryTest extends TestLogger {
@Rule public ExpectedException thrown = ExpectedException.none();
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
Arrays.asList(
Column.physical("a", DataTypes.STRING()),
Column.physical("b", DataTypes.STRING()),
Column.physical("c", DataTypes.BOOLEAN())),
new ArrayList<>(),
null);
private static final RowType ROW_TYPE = (RowType) SCHEMA.toSourceRowDataType().getLogicalType();
@Test
public void testSeDeSchema() {
final ChangelogJsonDeserializationSchema expectedDeser =
new ChangelogJsonDeserializationSchema(
ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), true, TimestampFormat.ISO_8601);
final ChangelogJsonSerializationSchema expectedSer =
new ChangelogJsonSerializationSchema(ROW_TYPE, TimestampFormat.ISO_8601);
final Map<String, String> options = getAllOptions();
final DynamicTableSource actualSource = createTableSource(options);
assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
DeserializationSchema<RowData> actualDeser =
scanSourceMock.valueFormat.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE, SCHEMA.toSourceRowDataType());
assertEquals(expectedDeser, actualDeser);
final DynamicTableSink actualSink = createTableSink(options);
assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
SerializationSchema<RowData> actualSer =
sinkMock.valueFormat.createRuntimeEncoder(
new SinkRuntimeProviderContext(false), SCHEMA.toSourceRowDataType());
assertEquals(expectedSer, actualSer);
}
@Test
public void testInvalidIgnoreParseError() {
final Map<String, String> options =
getModifiedOptions(opts -> opts.put("changelog-json.ignore-parse-errors", "abc"));
try {
createTableSource(options);
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
e,
"Unrecognized option for boolean: abc. Expected either true or false(case insensitive)")
.isPresent());
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
/**
* Returns the full options modified by the given consumer {@code optionModifier}.
*
* @param optionModifier Consumer to modify the options
*/
private Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
Map<String, String> options = getAllOptions();
optionModifier.accept(options);
return options;
}
private Map<String, String> getAllOptions() {
final Map<String, String> options = new HashMap<>();
options.put("connector", TestDynamicTableFactory.IDENTIFIER);
options.put("target", "MyTarget");
options.put("buffer-size", "1000");
options.put("format", "changelog-json");
options.put("changelog-json.ignore-parse-errors", "true");
options.put("changelog-json.timestamp-format.standard", "ISO-8601");
return options;
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(SCHEMA).build(),
"mock source",
new ArrayList<>(),
options),
SCHEMA),
new Configuration(),
ChangelogJsonFormatFactoryTest.class.getClassLoader(),
false);
}
private static DynamicTableSink createTableSink(Map<String, String> options) {
return FactoryUtil.createTableSink(
null,
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(SCHEMA).build(),
"mock source",
new ArrayList<>(),
options),
SCHEMA),
new Configuration(),
ChangelogJsonFormatFactoryTest.class.getClassLoader(),
false);
}
}

@ -1,185 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.formats.json;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.FLOAT;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.junit.Assert.assertEquals;
/**
* Tests for {@link ChangelogJsonSerializationSchema} and {@link
* ChangelogJsonDeserializationSchema}.
*/
public class ChangelogJsonSerDeTest {
private static final RowType SCHEMA =
(RowType)
ROW(
FIELD("id", INT().notNull()),
FIELD("name", STRING()),
FIELD("description", STRING()),
FIELD("weight", FLOAT()))
.getLogicalType();
@Test
public void testSerializationDeserialization() throws Exception {
List<String> lines = readLines("changelog-json-data.txt");
ChangelogJsonDeserializationSchema deserializationSchema =
new ChangelogJsonDeserializationSchema(
SCHEMA, InternalTypeInfo.of(SCHEMA), false, TimestampFormat.SQL);
deserializationSchema.open(null);
SimpleCollector collector = new SimpleCollector();
for (String line : lines) {
deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector);
}
// CREATE TABLE product (
// id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
// name VARCHAR(255),
// description VARCHAR(512),
// weight FLOAT
// );
// ALTER TABLE product AUTO_INCREMENT = 101;
//
// INSERT INTO product
// VALUES (default,"scooter","Small 2-wheel scooter",3.14),
// (default,"car battery","12V car battery",8.1),
// (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40
// to #3",0.8),
// (default,"hammer","12oz carpenter's hammer",0.75),
// (default,"hammer","14oz carpenter's hammer",0.875),
// (default,"hammer","16oz carpenter's hammer",1.0),
// (default,"rocks","box of assorted rocks",5.3),
// (default,"jacket","water resistent black wind breaker",0.1),
// (default,"spare tire","24 inch spare tire",22.2);
// UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
// UPDATE product SET weight='5.1' WHERE id=107;
// INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
// INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
// UPDATE product SET description='new water resistent white wind breaker', weight='0.5'
// WHERE id=110;
// UPDATE product SET weight='5.17' WHERE id=111;
// DELETE FROM product WHERE id=111;
List<String> expected =
Arrays.asList(
"+I(101,scooter,Small 2-wheel scooter,3.14)",
"+I(102,car battery,12V car battery,8.1)",
"+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
"+I(104,hammer,12oz carpenter's hammer,0.75)",
"+I(105,hammer,14oz carpenter's hammer,0.875)",
"+I(106,hammer,16oz carpenter's hammer,1.0)",
"+I(107,rocks,box of assorted rocks,5.3)",
"+I(108,jacket,water resistent black wind breaker,0.1)",
"+I(109,spare tire,24 inch spare tire,22.2)",
"-U(106,hammer,16oz carpenter's hammer,1.0)",
"+U(106,hammer,18oz carpenter hammer,1.0)",
"-U(107,rocks,box of assorted rocks,5.3)",
"+U(107,rocks,box of assorted rocks,5.1)",
"+I(110,jacket,water resistent white wind breaker,0.2)",
"+I(111,scooter,Big 2-wheel scooter ,5.18)",
"-U(110,jacket,water resistent white wind breaker,0.2)",
"+U(110,jacket,new water resistent white wind breaker,0.5)",
"-U(111,scooter,Big 2-wheel scooter ,5.18)",
"+U(111,scooter,Big 2-wheel scooter ,5.17)",
"-D(111,scooter,Big 2-wheel scooter ,5.17)");
List<String> actual =
collector.list.stream().map(Object::toString).collect(Collectors.toList());
assertEquals(expected, actual);
ChangelogJsonSerializationSchema serializationSchema =
new ChangelogJsonSerializationSchema(SCHEMA, TimestampFormat.SQL);
serializationSchema.open(null);
List<String> result = new ArrayList<>();
for (RowData rowData : collector.list) {
result.add(new String(serializationSchema.serialize(rowData), StandardCharsets.UTF_8));
}
List<String> expectedResult =
Arrays.asList(
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"op\":\"+I\"}",
"{\"data\":{\"id\":102,\"name\":\"car battery\",\"description\":\"12V car battery\",\"weight\":8.1},\"op\":\"+I\"}",
"{\"data\":{\"id\":103,\"name\":\"12-pack drill bits\",\"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\"weight\":0.8},\"op\":\"+I\"}",
"{\"data\":{\"id\":104,\"name\":\"hammer\",\"description\":\"12oz carpenter's hammer\",\"weight\":0.75},\"op\":\"+I\"}",
"{\"data\":{\"id\":105,\"name\":\"hammer\",\"description\":\"14oz carpenter's hammer\",\"weight\":0.875},\"op\":\"+I\"}",
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"+I\"}",
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"+I\"}",
"{\"data\":{\"id\":108,\"name\":\"jacket\",\"description\":\"water resistent black wind breaker\",\"weight\":0.1},\"op\":\"+I\"}",
"{\"data\":{\"id\":109,\"name\":\"spare tire\",\"description\":\"24 inch spare tire\",\"weight\":22.2},\"op\":\"+I\"}",
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"16oz carpenter's hammer\",\"weight\":1.0},\"op\":\"-U\"}",
"{\"data\":{\"id\":106,\"name\":\"hammer\",\"description\":\"18oz carpenter hammer\",\"weight\":1.0},\"op\":\"+U\"}",
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.3},\"op\":\"-U\"}",
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted rocks\",\"weight\":5.1},\"op\":\"+U\"}",
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"+I\"}",
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"+I\"}",
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"water resistent white wind breaker\",\"weight\":0.2},\"op\":\"-U\"}",
"{\"data\":{\"id\":110,\"name\":\"jacket\",\"description\":\"new water resistent white wind breaker\",\"weight\":0.5},\"op\":\"+U\"}",
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.18},\"op\":\"-U\"}",
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"+U\"}",
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"op\":\"-D\"}");
assertEquals(expectedResult, result);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
private static List<String> readLines(String resource) throws IOException {
final URL url = ChangelogJsonSerDeTest.class.getClassLoader().getResource(resource);
assert url != null;
Path path = new File(url.getFile()).toPath();
return Files.readAllLines(path);
}
private static class SimpleCollector implements Collector<RowData> {
private List<RowData> list = new ArrayList<>();
@Override
public void collect(RowData record) {
list.add(record);
}
@Override
public void close() {
// do nothing
}
}
}

@ -1,20 +0,0 @@
{"data":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op":"+I"}
{"data":{"id":102,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"op":"+I"}
{"data":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"op":"+I"}
{"data":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"op":"+I"}
{"data":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"op":"+I"}
{"data":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op":"+I"}
{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op":"+I"}
{"data":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"op":"+I"}
{"data":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"op":"+I"}
{"data":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"op":"-U"}
{"data":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1},"op":"+U"}
{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"op":"-U"}
{"data":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"op":"+U"}
{"data":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"op":"+I"}
{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op":"+I"}
{"data":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"op":"-U"}
{"data":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"op":"+U"}
{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"op":"-U"}
{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op":"+U"}
{"data":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"op":"-D"}

@ -45,7 +45,6 @@ under the License.
<module>flink-sql-connector-mongodb-cdc</module>
<module>flink-sql-connector-oracle-cdc</module>
<module>flink-sql-connector-sqlserver-cdc</module>
<module>flink-format-changelog-json</module>
<module>flink-cdc-e2e-tests</module>
</modules>

Loading…
Cancel
Save