From a1c9c19d7d378a2de5044e79516630b5baaf9cc6 Mon Sep 17 00:00:00 2001 From: Seung-Min Lee Date: Tue, 22 Oct 2024 11:32:21 +0900 Subject: [PATCH] [FLINK-36578][pipeline-connector/mysql] Introduce option to unify json type output between snapshot phase and binlog phase This closes #3658 --- .../connectors/flink-sources/mysql-cdc.md | 14 +- .../connectors/pipeline-connectors/mysql.md | 12 + .../mysql/factory/MySqlDataSourceFactory.java | 6 +- .../mysql/source/MySqlDataSourceOptions.java | 8 + .../mysql/source/MySqlFullTypesITCase.java | 123 ++++++- .../source/MySqlMetadataAccessorITCase.java | 66 +++- .../test/resources/ddl/column_type_test.sql | 20 +- .../resources/ddl/column_type_test_mysql8.sql | 18 + .../json/JsonStringFormatter.java | 337 ++++++++++++++++++ .../cdc/connectors/mysql/MySqlSource.java | 6 + .../mysql/source/MySqlSourceBuilder.java | 9 + .../source/config/MySqlSourceConfig.java | 5 +- .../config/MySqlSourceConfigFactory.java | 13 +- .../source/config/MySqlSourceOptions.java | 8 + .../mysql/table/MySqlTableSource.java | 9 +- .../mysql/table/MySqlTableSourceFactory.java | 5 +- .../table/MySqlTableSourceFactoryTest.java | 35 +- .../test/resources/ddl/column_type_test.sql | 2 +- .../resources/ddl/column_type_test_mysql8.sql | 2 +- 19 files changed, 670 insertions(+), 28 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 60d81baa2..4bd22a0a1 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -401,7 +401,19 @@ During a snapshot operation, the connector will query each included table to pro Schema change events are applied to a "shadow" table and then swapped with the original table later.
This is an experimental feature, and subject to change in the future. - + + + + use.legacy.json.format + optional + true + Boolean + Whether to use legacy JSON format to cast JSON type data in binlog.
+ It determines whether to use the legacy JSON format when retrieving JSON type data in binlog. + If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example, + JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}. + When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved. + diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 9bcd2a617..b05e67921 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -320,6 +320,18 @@ pipeline: Boolean Whether treat TINYINT(1) as boolean, by default is true. + + use.legacy.json.format + optional + true + Boolean + Whether to use legacy JSON format to cast JSON type data in binlog.
+ It determines whether to use the legacy JSON format when retrieving JSON type data in binlog. + If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example, + JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}. + When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved. + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 48819c50c..3072b86a0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -94,6 +94,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USE_LEGACY_JSON_FORMAT; import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; @@ -148,6 +149,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { boolean scanBinlogNewlyAddedTableEnabled = config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES); + boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -198,7 +200,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory { .jdbcProperties(getJdbcProperties(configMap)) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges) - .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean); + .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean) + .useLegacyJsonFormat(useLegacyJsonFormat); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); @@ -331,6 +334,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); options.add(METADATA_LIST); options.add(INCLUDE_COMMENTS_ENABLED); + options.add(USE_LEGACY_JSON_FORMAT); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 89878d0ea..b57ebe044 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -305,4 +305,12 @@ public class MySqlDataSourceOptions { .booleanType() .defaultValue(true) .withDescription("Whether treat TINYINT(1) as boolean, by default is true. "); + + @Experimental + public static final ConfigOption USE_LEGACY_JSON_FORMAT = + ConfigOptions.key("use.legacy.json.format") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java index dd15fdf6a..0df85eaa3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java @@ -106,11 +106,39 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { testCommonDataTypes(fullTypesMySql57Database); } + @Test + public void testMysql57JsonDataTypes() throws Throwable { + // Set `useLegacyJsonFormat` as false, so the json string will have no whitespace + // before value and after comma in json format be formatted with legacy format. + testJsonDataType(fullTypesMySql57Database, false); + } + + @Test + public void testMysql57JsonDataTypesWithUseLegacyJsonFormat() throws Throwable { + // Set `useLegacyJsonFormat` as true, so the json string will have whitespace before + // value and after comma in json format be formatted with legacy format. + testJsonDataType(fullTypesMySql57Database, true); + } + @Test public void testMySql8CommonDataTypes() throws Throwable { testCommonDataTypes(fullTypesMySql8Database); } + @Test + public void testMySql8JsonDataTypes() throws Throwable { + // Set `useLegacyJsonFormat` as false, so the json string will have no whitespace + // before value and after comma in json format be formatted with legacy format. + testJsonDataType(fullTypesMySql8Database, false); + } + + @Test + public void testMySql8JsonDataTypesWithUseLegacyJsonFormat() throws Throwable { + // Set `useLegacyJsonFormat` as true, so the json string will have whitespace before + // value and after comma in json format be formatted with legacy format. + testJsonDataType(fullTypesMySql8Database, true); + } + @Test public void testMysql57TimeDataTypes() throws Throwable { RowType recordType = @@ -321,9 +349,13 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { }; database.createAndInitialize(); + Boolean useLegacyJsonFormat = true; CloseableIterator iterator = env.fromSource( - getFlinkSourceProvider(new String[] {"precision_types"}, database) + getFlinkSourceProvider( + new String[] {"precision_types"}, + database, + useLegacyJsonFormat) .getSource(), WatermarkStrategy.noWatermarks(), "Event-Source") @@ -351,9 +383,15 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { private void testCommonDataTypes(UniqueDatabase database) throws Exception { database.createAndInitialize(); + // Set useLegacyJsonFormat option as true, so the json string will have no whitespace before + // value and after comma in json format.be formatted with legacy format. + Boolean useLegacyJsonFormat = true; CloseableIterator iterator = env.fromSource( - getFlinkSourceProvider(new String[] {"common_types"}, database) + getFlinkSourceProvider( + new String[] {"common_types"}, + database, + useLegacyJsonFormat) .getSource(), WatermarkStrategy.noWatermarks(), "Event-Source") @@ -446,7 +484,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { } expectedSnapshot[30] = null; - // The json string from binlog will remove useless space + // Legacy format removes useless space in json string from binlog expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}"); Object[] expectedStreamRecord = expectedSnapshot; @@ -457,6 +495,66 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { .isEqualTo(expectedStreamRecord); } + private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat) + throws Exception { + database.createAndInitialize(); + CloseableIterator iterator = + env.fromSource( + getFlinkSourceProvider( + new String[] {"json_types"}, + database, + useLegacyJsonFormat) + .getSource(), + WatermarkStrategy.noWatermarks(), + "Event-Source") + .executeAndCollect(); + + Object[] expectedSnapshot = + new Object[] { + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), + BinaryStringData.fromString("{\"key1\": \"value1\"}"), + BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"), + BinaryStringData.fromString( + "[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"), + 1 + }; + + // skip CreateTableEvent + List snapshotResults = + MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); + Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES)) + .isEqualTo(expectedSnapshot); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;"); + } + + Object[] expectedStreamRecord = expectedSnapshot; + List streamResults = + MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; + RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after(); + + expectedSnapshot[4] = null; + + if (useLegacyJsonFormat) { + // removed whitespace before value and after comma in json format string value + Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES)) + .containsExactly( + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), + BinaryStringData.fromString("{\"key1\":\"value1\"}"), + BinaryStringData.fromString( + "{\"key1\":\"value1\",\"key2\":\"value2\"}"), + BinaryStringData.fromString( + "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"), + null); + } else { + Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES)) + .containsExactly(expectedStreamRecord); + } + } + private Instant toInstant(String ts) { return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant(); } @@ -468,9 +566,13 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { Object[] expectedStreamRecord) throws Exception { database.createAndInitialize(); + Boolean useLegacyJsonFormat = true; CloseableIterator iterator = env.fromSource( - getFlinkSourceProvider(new String[] {"time_types"}, database) + getFlinkSourceProvider( + new String[] {"time_types"}, + database, + useLegacyJsonFormat) .getSource(), WatermarkStrategy.noWatermarks(), "Event-Source") @@ -498,7 +600,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { } private FlinkSourceProvider getFlinkSourceProvider( - String[] captureTables, UniqueDatabase database) { + String[] captureTables, UniqueDatabase database, Boolean useLegacyJsonFormat) { String[] captureTableIds = Arrays.stream(captureTables) .map(tableName -> database.getDatabaseName() + "." + tableName) @@ -517,7 +619,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { .username(database.getUsername()) .password(database.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) - .serverId(MySqSourceTestUtils.getServerId(env.getParallelism())); + .serverId(MySqSourceTestUtils.getServerId(env.getParallelism())) + .useLegacyJsonFormat(useLegacyJsonFormat); return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); } @@ -577,4 +680,12 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()); + + private static final RowType JSON_TYPES = + RowType.of( + DataTypes.DECIMAL(20, 0).notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.INT()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java index 96f3b5f92..92fafb568 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java @@ -222,6 +222,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { fullTypesMySql57Database.createAndInitialize(); String[] tables = new String[] {"precision_types"}; + MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, fullTypesMySql57Database, true); @@ -229,6 +230,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { metadataAccessor.getTableSchema( TableId.tableId( fullTypesMySql57Database.getDatabaseName(), "precision_types")); + Schema expectedSchema = Schema.newBuilder() .primaryKey("id") @@ -304,6 +306,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { metadataAccessor.getTableSchema( TableId.tableId( fullTypesMySql8Database.getDatabaseName(), "precision_types")); + Schema expectedSchema = Schema.newBuilder() .primaryKey("id") @@ -370,7 +373,8 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { private void testAccessDatabaseAndTable(UniqueDatabase database) { database.createAndInitialize(); - String[] tables = new String[] {"common_types", "time_types", "precision_types"}; + String[] tables = + new String[] {"common_types", "time_types", "precision_types", "json_types"}; MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true); assertThatThrownBy(metadataAccessor::listNamespaces) @@ -528,6 +532,66 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { return new MySqlMetadataAccessor(sourceConfig); } + @Test + public void testMysql57AccessJsonTypesSchema() { + fullTypesMySql57Database.createAndInitialize(); + + String[] tables = new String[] {"json_types"}; + MySqlMetadataAccessor metadataAccessor = + getMetadataAccessor(tables, fullTypesMySql57Database, true); + + Schema actualSchema = + metadataAccessor.getTableSchema( + TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types")); + Schema expectedSchema = + Schema.newBuilder() + .primaryKey("id") + .fromRowDataType( + RowType.of( + new DataType[] { + DataTypes.DECIMAL(20, 0).notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.INT() + }, + new String[] { + "id", "json_c0", "json_c1", "json_c2", "int_c", + })) + .build(); + assertThat(actualSchema).isEqualTo(expectedSchema); + } + + @Test + public void testMysql8AccessJsonTypesSchema() { + fullTypesMySql57Database.createAndInitialize(); + + String[] tables = new String[] {"json_types"}; + MySqlMetadataAccessor metadataAccessor = + getMetadataAccessor(tables, fullTypesMySql57Database, true); + + Schema actualSchema = + metadataAccessor.getTableSchema( + TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types")); + Schema expectedSchema = + Schema.newBuilder() + .primaryKey("id") + .fromRowDataType( + RowType.of( + new DataType[] { + DataTypes.DECIMAL(20, 0).notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.INT() + }, + new String[] { + "id", "json_c0", "json_c1", "json_c2", "int_c", + })) + .build(); + assertThat(actualSchema).isEqualTo(expectedSchema); + } + private MySqlSourceConfig getConfig( String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) { String[] captureTableIds = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql index 9699ed908..e7dc07e28 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql @@ -180,4 +180,22 @@ VALUES (DEFAULT, 23, 29, 31, - 37); \ No newline at end of file + 37); + +CREATE TABLE json_types +( + id SERIAL, + json_c0 JSON, + json_c1 JSON, + json_c2 JSON, + int_c INTEGER, + PRIMARY KEY (id) +) DEFAULT CHARSET=utf8; + +INSERT INTO json_types +VALUES (DEFAULT, + '{"key1":"value1"}', + '{"key1":"value1","key2":"value2"}', + '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]', + 1 + ); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql index 8abe8868c..92548381f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql @@ -185,3 +185,21 @@ VALUES (DEFAULT, 29, 31, 37); + +CREATE TABLE json_types +( + id SERIAL, + json_c0 JSON, + json_c1 JSON, + json_c2 JSON, + int_c INTEGER, + PRIMARY KEY (id) +) DEFAULT CHARSET=utf8; + +INSERT INTO json_types +VALUES (DEFAULT, + '{"key1":"value1"}', + '{"key1":"value1","key2":"value2"}', + '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]', + 1 + ); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java new file mode 100644 index 000000000..7154496c0 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.shyiko.mysql.binlog.event.deserialization.json; + +import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Base64; + +import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig.useLegacyJsonFormat; + +/** + * Copied from mysql-binlog-connector-java 0.27.2 to add whitespace before value and after comma. + * + *

Line 105: Added whitespace before value, Line 213: Added whitespace after comma + */ +public class JsonStringFormatter implements JsonFormatter { + + /** + * Value used for lookup tables to indicate that matching characters do not need to be escaped. + */ + private static final int ESCAPE_NONE = 0; + + /** + * Value used for lookup tables to indicate that matching characters are to be escaped using + * standard escaping; for JSON this means (for example) using "backslash - u" escape method. + */ + private static final int ESCAPE_GENERIC = -1; + + /** + * A lookup table that determines which of the first 128 Unicode code points (single-byte UTF-8 + * characters) must be escaped. A value of '0' means no escaping is required; positive values + * must be escaped with a preceding backslash; and negative values that generic escaping (e.g., + */ + private static final int[] ESCAPES; + + static { + int[] escape = new int[128]; + // Generic escape for control characters ... + for (int i = 0; i < 32; ++i) { + escape[i] = ESCAPE_GENERIC; + } + // Backslash escape for other specific characters ... + escape['"'] = '"'; + escape['\\'] = '\\'; + // Escaping of slash is optional, so let's not add it + escape[0x08] = 'b'; + escape[0x09] = 't'; + escape[0x0C] = 'f'; + escape[0x0A] = 'n'; + escape[0x0D] = 'r'; + ESCAPES = escape; + } + + private static final char[] HEX_CODES = "0123456789ABCDEF".toCharArray(); + + private final StringBuilder sb = new StringBuilder(); + + @Override + public String toString() { + return getString(); + } + + public String getString() { + return sb.toString(); + } + + @Override + public void beginObject(int numElements) { + sb.append('{'); + } + + @Override + public void beginArray(int numElements) { + sb.append('['); + } + + @Override + public void endObject() { + sb.append('}'); + } + + @Override + public void endArray() { + sb.append(']'); + } + + @Override + public void name(String name) { + sb.append('"'); + appendString(name); + if (useLegacyJsonFormat) { + sb.append("\":"); + } else { + sb.append("\": "); + } + } + + @Override + public void value(String value) { + sb.append('"'); + appendString(value); + sb.append('"'); + } + + @Override + public void value(int value) { + sb.append(Integer.toString(value)); + } + + @Override + public void value(long value) { + sb.append(Long.toString(value)); + } + + @Override + public void value(double value) { + // Double's toString method will result in scientific notation and loss of precision + String str = Double.toString(value); + if (str.contains("E")) { + value(new BigDecimal(value)); + } else { + sb.append(str); + } + } + + @Override + public void value(BigInteger value) { + // Using the BigInteger.toString() method will result in scientific notation, so instead ... + value(new BigDecimal(value)); + } + + @Override + public void value(BigDecimal value) { + // Using the BigInteger.toString() method will result in scientific notation, so instead ... + sb.append(value.toPlainString()); + } + + @Override + public void value(boolean value) { + sb.append(Boolean.toString(value)); + } + + @Override + public void valueNull() { + sb.append("null"); + } + + @Override + public void valueYear(int year) { + sb.append(year); + } + + @Override + public void valueDate(int year, int month, int day) { + sb.append('"'); + appendDate(year, month, day); + sb.append('"'); + } + + @Override + // checkstyle, please ignore ParameterNumber for the next line + public void valueDatetime( + int year, int month, int day, int hour, int min, int sec, int microSeconds) { + sb.append('"'); + appendDate(year, month, day); + sb.append(' '); + appendTime(hour, min, sec, microSeconds); + sb.append('"'); + } + + @Override + public void valueTime(int hour, int min, int sec, int microSeconds) { + sb.append('"'); + if (hour < 0) { + sb.append('-'); + hour = Math.abs(hour); + } + appendTime(hour, min, sec, microSeconds); + sb.append('"'); + } + + @Override + public void valueTimestamp(long secondsPastEpoch, int microSeconds) { + sb.append(secondsPastEpoch); + appendSixDigitUnsignedInt(microSeconds, false); + } + + @Override + public void valueOpaque(ColumnType type, byte[] value) { + sb.append('"'); + sb.append(Base64.getEncoder().encodeToString(value)); + sb.append('"'); + } + + @Override + public void nextEntry() { + if (useLegacyJsonFormat) { + sb.append(","); + } else { + sb.append(", "); + } + } + + /** + * Append a string by escaping any characters that must be escaped. + * + * @param original the string to be written; may not be null + */ + protected void appendString(String original) { + for (int i = 0, len = original.length(); i < len; ++i) { + char c = original.charAt(i); + int ch = c; + if (ch < 0 || ch >= ESCAPES.length || ESCAPES[ch] == 0) { + sb.append(c); + continue; + } + int escape = ESCAPES[ch]; + if (escape > 0) { // 2-char escape, fine + sb.append('\\'); + sb.append((char) escape); + } else { + unicodeEscape(ch); + } + } + } + + /** + * Append a generic Unicode escape for given character. + * + * @param charToEscape the character to escape + */ + private void unicodeEscape(int charToEscape) { + sb.append('\\'); + sb.append('u'); + if (charToEscape > 0xFF) { + int hi = (charToEscape >> 8) & 0xFF; + sb.append(HEX_CODES[hi >> 4]); + sb.append(HEX_CODES[hi & 0xF]); + charToEscape &= 0xFF; + } else { + sb.append('0'); + sb.append('0'); + } + // We know it's a control char, so only the last 2 chars are non-0 + sb.append(HEX_CODES[charToEscape >> 4]); + sb.append(HEX_CODES[charToEscape & 0xF]); + } + + protected void appendTwoDigitUnsignedInt(int value) { + assert value >= 0; + assert value < 100; + if (value < 10) { + sb.append("0").append(value); + } else { + sb.append(value); + } + } + + protected void appendFourDigitUnsignedInt(int value) { + if (value < 10) { + sb.append("000").append(value); + } else if (value < 100) { + sb.append("00").append(value); + } else if (value < 1000) { + sb.append("0").append(value); + } else { + sb.append(value); + } + } + + protected void appendSixDigitUnsignedInt(int value, boolean trimTrailingZeros) { + assert value > 0; + assert value < 1000000; + // Add prefixes if necessary ... + if (value < 10) { + sb.append("00000"); + } else if (value < 100) { + sb.append("0000"); + } else if (value < 1000) { + sb.append("000"); + } else if (value < 10000) { + sb.append("00"); + } else if (value < 100000) { + sb.append("0"); + } + if (trimTrailingZeros) { + // Remove any trailing 0's ... + for (int i = 0; i != 6; ++i) { + if (value % 10 == 0) { + value /= 10; + } + } + sb.append(value); + } + } + + protected void appendDate(int year, int month, int day) { + if (year < 0) { + sb.append('-'); + year = Math.abs(year); + } + appendFourDigitUnsignedInt(year); + sb.append('-'); + appendTwoDigitUnsignedInt(month); + sb.append('-'); + appendTwoDigitUnsignedInt(day); + } + + protected void appendTime(int hour, int min, int sec, int microSeconds) { + appendTwoDigitUnsignedInt(hour); + sb.append(':'); + appendTwoDigitUnsignedInt(min); + sb.append(':'); + appendTwoDigitUnsignedInt(sec); + if (microSeconds != 0) { + sb.append('.'); + appendSixDigitUnsignedInt(microSeconds, true); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java index 49187838f..fb512a1eb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.cdc.debezium.DebeziumSourceFunction; @@ -157,6 +158,11 @@ public class MySqlSource { return this; } + public Builder useLegacyJsonFormat(boolean useLegacyJsonFormat) { + MySqlSourceConfig.useLegacyJsonFormat = useLegacyJsonFormat; + return this; + } + public DebeziumSourceFunction build() { Properties props = new Properties(); props.setProperty("connector.class", MySqlConnector.class.getCanonicalName()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 3fce1eb61..05081db57 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -253,6 +253,15 @@ public class MySqlSourceBuilder { return this; } + /** + * Whether to use legacy json format. The default value is true, which means there is no + * whitespace before value and after comma in json format. + */ + public MySqlSourceBuilder useLegacyJsonFormat(boolean useLegacyJsonFormat) { + this.configFactory.useLegacyJsonFormat(useLegacyJsonFormat); + return this; + } + /** * Whether to close idle readers at the end of the snapshot phase. This feature depends on * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 56d510516..a7cd09ddf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -67,6 +67,7 @@ public class MySqlSourceConfig implements Serializable { private final Map chunkKeyColumns; private final boolean skipSnapshotBackfill; private final boolean parseOnLineSchemaChanges; + public static boolean useLegacyJsonFormat = true; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -103,7 +104,8 @@ public class MySqlSourceConfig implements Serializable { Map chunkKeyColumns, boolean skipSnapshotBackfill, boolean parseOnLineSchemaChanges, - boolean treatTinyInt1AsBoolean) { + boolean treatTinyInt1AsBoolean, + boolean useLegacyJsonFormat) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -133,6 +135,7 @@ public class MySqlSourceConfig implements Serializable { this.skipSnapshotBackfill = skipSnapshotBackfill; this.parseOnLineSchemaChanges = parseOnLineSchemaChanges; this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; + this.useLegacyJsonFormat = useLegacyJsonFormat; } public String getHostname() { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index c1459bf23..1ec841947 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -72,6 +72,7 @@ public class MySqlSourceConfigFactory implements Serializable { private boolean skipSnapshotBackfill = false; private boolean parseOnLineSchemaChanges = false; private boolean treatTinyInt1AsBoolean = true; + private boolean useLegacyJsonFormat = true; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -278,6 +279,15 @@ public class MySqlSourceConfigFactory implements Serializable { return this; } + /** + * Whether to use legacy json format. The default value is true, which means there is no + * whitespace before value and after comma in json format. + */ + public MySqlSourceConfigFactory useLegacyJsonFormat(boolean useLegacyJsonFormat) { + this.useLegacyJsonFormat = useLegacyJsonFormat; + return this; + } + /** * Whether to close idle readers at the end of the snapshot phase. This feature depends on * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be @@ -399,6 +409,7 @@ public class MySqlSourceConfigFactory implements Serializable { chunkKeyColumns, skipSnapshotBackfill, parseOnLineSchemaChanges, - treatTinyInt1AsBoolean); + treatTinyInt1AsBoolean, + useLegacyJsonFormat); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index 87de37e05..5f2935932 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -270,4 +270,12 @@ public class MySqlSourceOptions { .defaultValue(false) .withDescription( "Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false."); + + @Experimental + public static final ConfigOption USE_LEGACY_JSON_FORMAT = + ConfigOptions.key("use.legacy.json.format") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index 8d023ad30..3c5d6d63b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -99,6 +99,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final String chunkKeyColumn; final boolean skipSnapshotBackFill; final boolean parseOnlineSchemaChanges; + private final boolean useLegacyJsonFormat; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -137,7 +138,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat Duration heartbeatInterval, @Nullable String chunkKeyColumn, boolean skipSnapshotBackFill, - boolean parseOnlineSchemaChanges) { + boolean parseOnlineSchemaChanges, + boolean useLegacyJsonFormat) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -168,6 +170,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat this.heartbeatInterval = heartbeatInterval; this.chunkKeyColumn = chunkKeyColumn; this.skipSnapshotBackFill = skipSnapshotBackFill; + this.useLegacyJsonFormat = useLegacyJsonFormat; } @Override @@ -224,6 +227,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat .chunkKeyColumn(new ObjectPath(database, tableName), chunkKeyColumn) .skipSnapshotBackfill(skipSnapshotBackFill) .parseOnLineSchemaChanges(parseOnlineSchemaChanges) + .useLegacyJsonFormat(useLegacyJsonFormat) .build(); return SourceProvider.of(parallelSource); } else { @@ -310,7 +314,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat heartbeatInterval, chunkKeyColumn, skipSnapshotBackFill, - parseOnlineSchemaChanges); + parseOnlineSchemaChanges, + useLegacyJsonFormat); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index c3404db42..5dadc646b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -104,6 +104,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); boolean parseOnLineSchemaChanges = config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES); + boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); if (enableParallelRead) { validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn); @@ -148,7 +149,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { heartbeatInterval, chunkKeyColumn, skipSnapshotBackFill, - parseOnLineSchemaChanges); + parseOnLineSchemaChanges, + useLegacyJsonFormat); } @Override @@ -195,6 +197,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES); + options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index e5acc934f..6890ca78f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -56,6 +56,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOpt import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.USE_LEGACY_JSON_FORMAT; import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -129,7 +130,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), null, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -176,7 +178,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), "testCol", SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -219,7 +222,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), null, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -260,7 +264,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), null, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -280,6 +285,7 @@ public class MySqlTableSourceFactoryTest { options.put("scan.incremental.snapshot.chunk.key-column", "testCol"); options.put("scan.incremental.close-idle-reader.enabled", "true"); options.put("scan.incremental.snapshot.backfill.skip", "true"); + options.put("use.legacy.json.format", "true"); DynamicTableSource actualSource = createTableSource(options); Properties dbzProperties = new Properties(); @@ -317,7 +323,8 @@ public class MySqlTableSourceFactoryTest { Duration.ofMillis(15213), "testCol", true, - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + true); assertEquals(expectedSource, actualSource); assertTrue(actualSource instanceof MySqlTableSource); MySqlTableSource actualMySqlTableSource = (MySqlTableSource) actualSource; @@ -372,7 +379,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), null, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -411,7 +419,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), null, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -451,7 +460,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), null, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -492,7 +502,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), null, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -531,7 +542,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), null, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -575,7 +587,8 @@ public class MySqlTableSourceFactoryTest { HEARTBEAT_INTERVAL.defaultValue(), null, SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), - PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql index e9145a7e8..b75fd69f0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql @@ -111,4 +111,4 @@ CREATE TABLE user_info balance DECIMAL(18, 2), balance2 DECIMAL(18, 2), PRIMARY KEY (user_id) -) DEFAULT CHARSET=utf8; \ No newline at end of file +) DEFAULT CHARSET=utf8; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql index 0a202f1ca..842cfcc51 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql @@ -100,4 +100,4 @@ VALUES (DEFAULT, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215 ST_GeomFromText('MULTIPOINT((1 1),(2 2))'), ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'), ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'), - ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))')); \ No newline at end of file + ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))'));