[FLINK-36578][pipeline-connector/mysql] Introduce option to unify json type output between snapshot phase and binlog phase

This closes #3658
pull/3839/head
Seung-Min Lee 3 months ago committed by Leonard Xu
parent 947dffca31
commit a1c9c19d7d

@ -401,7 +401,19 @@ During a snapshot operation, the connector will query each included table to pro
Schema change events are applied to a "shadow" table and then swapped with the original table later.
<br>
This is an experimental feature, and subject to change in the future.
</td>
</td>
</tr>
<tr>
<td>use.legacy.json.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
It determines whether to use the legacy JSON format when retrieving JSON type data in binlog.
If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
</td>
</tr>
</tbody>
</table>

@ -320,6 +320,18 @@ pipeline:
<td>Boolean</td>
<td>Whether treat TINYINT(1) as boolean, by default is true.</td>
</tr>
<tr>
<td>use.legacy.json.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
It determines whether to use the legacy JSON format when retrieving JSON type data in binlog.
If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
</td>
</tr>
</tbody>
</table>
</div>

@ -94,6 +94,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USE_LEGACY_JSON_FORMAT;
import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
@ -148,6 +149,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
boolean scanBinlogNewlyAddedTableEnabled =
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@ -198,7 +200,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean);
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat);
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
@ -331,6 +334,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
options.add(METADATA_LIST);
options.add(INCLUDE_COMMENTS_ENABLED);
options.add(USE_LEGACY_JSON_FORMAT);
return options;
}

@ -305,4 +305,12 @@ public class MySqlDataSourceOptions {
.booleanType()
.defaultValue(true)
.withDescription("Whether treat TINYINT(1) as boolean, by default is true. ");
@Experimental
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
ConfigOptions.key("use.legacy.json.format")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
}

@ -106,11 +106,39 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
testCommonDataTypes(fullTypesMySql57Database);
}
@Test
public void testMysql57JsonDataTypes() throws Throwable {
// Set `useLegacyJsonFormat` as false, so the json string will have no whitespace
// before value and after comma in json format be formatted with legacy format.
testJsonDataType(fullTypesMySql57Database, false);
}
@Test
public void testMysql57JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
// Set `useLegacyJsonFormat` as true, so the json string will have whitespace before
// value and after comma in json format be formatted with legacy format.
testJsonDataType(fullTypesMySql57Database, true);
}
@Test
public void testMySql8CommonDataTypes() throws Throwable {
testCommonDataTypes(fullTypesMySql8Database);
}
@Test
public void testMySql8JsonDataTypes() throws Throwable {
// Set `useLegacyJsonFormat` as false, so the json string will have no whitespace
// before value and after comma in json format be formatted with legacy format.
testJsonDataType(fullTypesMySql8Database, false);
}
@Test
public void testMySql8JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
// Set `useLegacyJsonFormat` as true, so the json string will have whitespace before
// value and after comma in json format be formatted with legacy format.
testJsonDataType(fullTypesMySql8Database, true);
}
@Test
public void testMysql57TimeDataTypes() throws Throwable {
RowType recordType =
@ -321,9 +349,13 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
};
database.createAndInitialize();
Boolean useLegacyJsonFormat = true;
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(new String[] {"precision_types"}, database)
getFlinkSourceProvider(
new String[] {"precision_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
@ -351,9 +383,15 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
private void testCommonDataTypes(UniqueDatabase database) throws Exception {
database.createAndInitialize();
// Set useLegacyJsonFormat option as true, so the json string will have no whitespace before
// value and after comma in json format.be formatted with legacy format.
Boolean useLegacyJsonFormat = true;
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(new String[] {"common_types"}, database)
getFlinkSourceProvider(
new String[] {"common_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
@ -446,7 +484,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
}
expectedSnapshot[30] = null;
// The json string from binlog will remove useless space
// Legacy format removes useless space in json string from binlog
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
Object[] expectedStreamRecord = expectedSnapshot;
@ -457,6 +495,66 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
.isEqualTo(expectedStreamRecord);
}
private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat)
throws Exception {
database.createAndInitialize();
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(
new String[] {"json_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
.executeAndCollect();
Object[] expectedSnapshot =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
BinaryStringData.fromString("{\"key1\": \"value1\"}"),
BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"),
BinaryStringData.fromString(
"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
1
};
// skip CreateTableEvent
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES))
.isEqualTo(expectedSnapshot);
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;");
}
Object[] expectedStreamRecord = expectedSnapshot;
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
expectedSnapshot[4] = null;
if (useLegacyJsonFormat) {
// removed whitespace before value and after comma in json format string value
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
.containsExactly(
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
BinaryStringData.fromString("{\"key1\":\"value1\"}"),
BinaryStringData.fromString(
"{\"key1\":\"value1\",\"key2\":\"value2\"}"),
BinaryStringData.fromString(
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
null);
} else {
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
.containsExactly(expectedStreamRecord);
}
}
private Instant toInstant(String ts) {
return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant();
}
@ -468,9 +566,13 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
Object[] expectedStreamRecord)
throws Exception {
database.createAndInitialize();
Boolean useLegacyJsonFormat = true;
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(new String[] {"time_types"}, database)
getFlinkSourceProvider(
new String[] {"time_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
@ -498,7 +600,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
}
private FlinkSourceProvider getFlinkSourceProvider(
String[] captureTables, UniqueDatabase database) {
String[] captureTables, UniqueDatabase database, Boolean useLegacyJsonFormat) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." + tableName)
@ -517,7 +619,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
.username(database.getUsername())
.password(database.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
.serverId(MySqSourceTestUtils.getServerId(env.getParallelism()));
.serverId(MySqSourceTestUtils.getServerId(env.getParallelism()))
.useLegacyJsonFormat(useLegacyJsonFormat);
return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
}
@ -577,4 +680,12 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING());
private static final RowType JSON_TYPES =
RowType.of(
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT());
}

@ -222,6 +222,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
fullTypesMySql57Database.createAndInitialize();
String[] tables = new String[] {"precision_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database, true);
@ -229,6 +230,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
metadataAccessor.getTableSchema(
TableId.tableId(
fullTypesMySql57Database.getDatabaseName(), "precision_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
@ -304,6 +306,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
metadataAccessor.getTableSchema(
TableId.tableId(
fullTypesMySql8Database.getDatabaseName(), "precision_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
@ -370,7 +373,8 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();
String[] tables = new String[] {"common_types", "time_types", "precision_types"};
String[] tables =
new String[] {"common_types", "time_types", "precision_types", "json_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true);
assertThatThrownBy(metadataAccessor::listNamespaces)
@ -528,6 +532,66 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
return new MySqlMetadataAccessor(sourceConfig);
}
@Test
public void testMysql57AccessJsonTypesSchema() {
fullTypesMySql57Database.createAndInitialize();
String[] tables = new String[] {"json_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database, true);
Schema actualSchema =
metadataAccessor.getTableSchema(
TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
.fromRowDataType(
RowType.of(
new DataType[] {
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT()
},
new String[] {
"id", "json_c0", "json_c1", "json_c2", "int_c",
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
}
@Test
public void testMysql8AccessJsonTypesSchema() {
fullTypesMySql57Database.createAndInitialize();
String[] tables = new String[] {"json_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database, true);
Schema actualSchema =
metadataAccessor.getTableSchema(
TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
.fromRowDataType(
RowType.of(
new DataType[] {
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT()
},
new String[] {
"id", "json_c0", "json_c1", "json_c2", "int_c",
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
}
private MySqlSourceConfig getConfig(
String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) {
String[] captureTableIds =

@ -180,4 +180,22 @@ VALUES (DEFAULT,
23,
29,
31,
37);
37);
CREATE TABLE json_types
(
id SERIAL,
json_c0 JSON,
json_c1 JSON,
json_c2 JSON,
int_c INTEGER,
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;
INSERT INTO json_types
VALUES (DEFAULT,
'{"key1":"value1"}',
'{"key1":"value1","key2":"value2"}',
'[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]',
1
);

@ -185,3 +185,21 @@ VALUES (DEFAULT,
29,
31,
37);
CREATE TABLE json_types
(
id SERIAL,
json_c0 JSON,
json_c1 JSON,
json_c2 JSON,
int_c INTEGER,
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;
INSERT INTO json_types
VALUES (DEFAULT,
'{"key1":"value1"}',
'{"key1":"value1","key2":"value2"}',
'[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]',
1
);

@ -0,0 +1,337 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.shyiko.mysql.binlog.event.deserialization.json;
import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Base64;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig.useLegacyJsonFormat;
/**
* Copied from mysql-binlog-connector-java 0.27.2 to add whitespace before value and after comma.
*
* <p>Line 105: Added whitespace before value, Line 213: Added whitespace after comma
*/
public class JsonStringFormatter implements JsonFormatter {
/**
* Value used for lookup tables to indicate that matching characters do not need to be escaped.
*/
private static final int ESCAPE_NONE = 0;
/**
* Value used for lookup tables to indicate that matching characters are to be escaped using
* standard escaping; for JSON this means (for example) using "backslash - u" escape method.
*/
private static final int ESCAPE_GENERIC = -1;
/**
* A lookup table that determines which of the first 128 Unicode code points (single-byte UTF-8
* characters) must be escaped. A value of '0' means no escaping is required; positive values
* must be escaped with a preceding backslash; and negative values that generic escaping (e.g.,
*/
private static final int[] ESCAPES;
static {
int[] escape = new int[128];
// Generic escape for control characters ...
for (int i = 0; i < 32; ++i) {
escape[i] = ESCAPE_GENERIC;
}
// Backslash escape for other specific characters ...
escape['"'] = '"';
escape['\\'] = '\\';
// Escaping of slash is optional, so let's not add it
escape[0x08] = 'b';
escape[0x09] = 't';
escape[0x0C] = 'f';
escape[0x0A] = 'n';
escape[0x0D] = 'r';
ESCAPES = escape;
}
private static final char[] HEX_CODES = "0123456789ABCDEF".toCharArray();
private final StringBuilder sb = new StringBuilder();
@Override
public String toString() {
return getString();
}
public String getString() {
return sb.toString();
}
@Override
public void beginObject(int numElements) {
sb.append('{');
}
@Override
public void beginArray(int numElements) {
sb.append('[');
}
@Override
public void endObject() {
sb.append('}');
}
@Override
public void endArray() {
sb.append(']');
}
@Override
public void name(String name) {
sb.append('"');
appendString(name);
if (useLegacyJsonFormat) {
sb.append("\":");
} else {
sb.append("\": ");
}
}
@Override
public void value(String value) {
sb.append('"');
appendString(value);
sb.append('"');
}
@Override
public void value(int value) {
sb.append(Integer.toString(value));
}
@Override
public void value(long value) {
sb.append(Long.toString(value));
}
@Override
public void value(double value) {
// Double's toString method will result in scientific notation and loss of precision
String str = Double.toString(value);
if (str.contains("E")) {
value(new BigDecimal(value));
} else {
sb.append(str);
}
}
@Override
public void value(BigInteger value) {
// Using the BigInteger.toString() method will result in scientific notation, so instead ...
value(new BigDecimal(value));
}
@Override
public void value(BigDecimal value) {
// Using the BigInteger.toString() method will result in scientific notation, so instead ...
sb.append(value.toPlainString());
}
@Override
public void value(boolean value) {
sb.append(Boolean.toString(value));
}
@Override
public void valueNull() {
sb.append("null");
}
@Override
public void valueYear(int year) {
sb.append(year);
}
@Override
public void valueDate(int year, int month, int day) {
sb.append('"');
appendDate(year, month, day);
sb.append('"');
}
@Override
// checkstyle, please ignore ParameterNumber for the next line
public void valueDatetime(
int year, int month, int day, int hour, int min, int sec, int microSeconds) {
sb.append('"');
appendDate(year, month, day);
sb.append(' ');
appendTime(hour, min, sec, microSeconds);
sb.append('"');
}
@Override
public void valueTime(int hour, int min, int sec, int microSeconds) {
sb.append('"');
if (hour < 0) {
sb.append('-');
hour = Math.abs(hour);
}
appendTime(hour, min, sec, microSeconds);
sb.append('"');
}
@Override
public void valueTimestamp(long secondsPastEpoch, int microSeconds) {
sb.append(secondsPastEpoch);
appendSixDigitUnsignedInt(microSeconds, false);
}
@Override
public void valueOpaque(ColumnType type, byte[] value) {
sb.append('"');
sb.append(Base64.getEncoder().encodeToString(value));
sb.append('"');
}
@Override
public void nextEntry() {
if (useLegacyJsonFormat) {
sb.append(",");
} else {
sb.append(", ");
}
}
/**
* Append a string by escaping any characters that must be escaped.
*
* @param original the string to be written; may not be null
*/
protected void appendString(String original) {
for (int i = 0, len = original.length(); i < len; ++i) {
char c = original.charAt(i);
int ch = c;
if (ch < 0 || ch >= ESCAPES.length || ESCAPES[ch] == 0) {
sb.append(c);
continue;
}
int escape = ESCAPES[ch];
if (escape > 0) { // 2-char escape, fine
sb.append('\\');
sb.append((char) escape);
} else {
unicodeEscape(ch);
}
}
}
/**
* Append a generic Unicode escape for given character.
*
* @param charToEscape the character to escape
*/
private void unicodeEscape(int charToEscape) {
sb.append('\\');
sb.append('u');
if (charToEscape > 0xFF) {
int hi = (charToEscape >> 8) & 0xFF;
sb.append(HEX_CODES[hi >> 4]);
sb.append(HEX_CODES[hi & 0xF]);
charToEscape &= 0xFF;
} else {
sb.append('0');
sb.append('0');
}
// We know it's a control char, so only the last 2 chars are non-0
sb.append(HEX_CODES[charToEscape >> 4]);
sb.append(HEX_CODES[charToEscape & 0xF]);
}
protected void appendTwoDigitUnsignedInt(int value) {
assert value >= 0;
assert value < 100;
if (value < 10) {
sb.append("0").append(value);
} else {
sb.append(value);
}
}
protected void appendFourDigitUnsignedInt(int value) {
if (value < 10) {
sb.append("000").append(value);
} else if (value < 100) {
sb.append("00").append(value);
} else if (value < 1000) {
sb.append("0").append(value);
} else {
sb.append(value);
}
}
protected void appendSixDigitUnsignedInt(int value, boolean trimTrailingZeros) {
assert value > 0;
assert value < 1000000;
// Add prefixes if necessary ...
if (value < 10) {
sb.append("00000");
} else if (value < 100) {
sb.append("0000");
} else if (value < 1000) {
sb.append("000");
} else if (value < 10000) {
sb.append("00");
} else if (value < 100000) {
sb.append("0");
}
if (trimTrailingZeros) {
// Remove any trailing 0's ...
for (int i = 0; i != 6; ++i) {
if (value % 10 == 0) {
value /= 10;
}
}
sb.append(value);
}
}
protected void appendDate(int year, int month, int day) {
if (year < 0) {
sb.append('-');
year = Math.abs(year);
}
appendFourDigitUnsignedInt(year);
sb.append('-');
appendTwoDigitUnsignedInt(month);
sb.append('-');
appendTwoDigitUnsignedInt(day);
}
protected void appendTime(int hour, int min, int sec, int microSeconds) {
appendTwoDigitUnsignedInt(hour);
sb.append(':');
appendTwoDigitUnsignedInt(min);
sb.append(':');
appendTwoDigitUnsignedInt(sec);
if (microSeconds != 0) {
sb.append('.');
appendSixDigitUnsignedInt(microSeconds, true);
}
}
}

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.mysql;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
@ -157,6 +158,11 @@ public class MySqlSource {
return this;
}
public Builder<T> useLegacyJsonFormat(boolean useLegacyJsonFormat) {
MySqlSourceConfig.useLegacyJsonFormat = useLegacyJsonFormat;
return this;
}
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());

@ -253,6 +253,15 @@ public class MySqlSourceBuilder<T> {
return this;
}
/**
* Whether to use legacy json format. The default value is true, which means there is no
* whitespace before value and after comma in json format.
*/
public MySqlSourceBuilder<T> useLegacyJsonFormat(boolean useLegacyJsonFormat) {
this.configFactory.useLegacyJsonFormat(useLegacyJsonFormat);
return this;
}
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be

@ -67,6 +67,7 @@ public class MySqlSourceConfig implements Serializable {
private final Map<ObjectPath, String> chunkKeyColumns;
private final boolean skipSnapshotBackfill;
private final boolean parseOnLineSchemaChanges;
public static boolean useLegacyJsonFormat = true;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@ -103,7 +104,8 @@ public class MySqlSourceConfig implements Serializable {
Map<ObjectPath, String> chunkKeyColumns,
boolean skipSnapshotBackfill,
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean) {
boolean treatTinyInt1AsBoolean,
boolean useLegacyJsonFormat) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@ -133,6 +135,7 @@ public class MySqlSourceConfig implements Serializable {
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
this.useLegacyJsonFormat = useLegacyJsonFormat;
}
public String getHostname() {

@ -72,6 +72,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private boolean skipSnapshotBackfill = false;
private boolean parseOnLineSchemaChanges = false;
private boolean treatTinyInt1AsBoolean = true;
private boolean useLegacyJsonFormat = true;
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
@ -278,6 +279,15 @@ public class MySqlSourceConfigFactory implements Serializable {
return this;
}
/**
* Whether to use legacy json format. The default value is true, which means there is no
* whitespace before value and after comma in json format.
*/
public MySqlSourceConfigFactory useLegacyJsonFormat(boolean useLegacyJsonFormat) {
this.useLegacyJsonFormat = useLegacyJsonFormat;
return this;
}
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
@ -399,6 +409,7 @@ public class MySqlSourceConfigFactory implements Serializable {
chunkKeyColumns,
skipSnapshotBackfill,
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean);
treatTinyInt1AsBoolean,
useLegacyJsonFormat);
}
}

@ -270,4 +270,12 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false.");
@Experimental
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
ConfigOptions.key("use.legacy.json.format")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
}

@ -99,6 +99,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final String chunkKeyColumn;
final boolean skipSnapshotBackFill;
final boolean parseOnlineSchemaChanges;
private final boolean useLegacyJsonFormat;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -137,7 +138,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
Duration heartbeatInterval,
@Nullable String chunkKeyColumn,
boolean skipSnapshotBackFill,
boolean parseOnlineSchemaChanges) {
boolean parseOnlineSchemaChanges,
boolean useLegacyJsonFormat) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@ -168,6 +170,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.heartbeatInterval = heartbeatInterval;
this.chunkKeyColumn = chunkKeyColumn;
this.skipSnapshotBackFill = skipSnapshotBackFill;
this.useLegacyJsonFormat = useLegacyJsonFormat;
}
@Override
@ -224,6 +227,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.chunkKeyColumn(new ObjectPath(database, tableName), chunkKeyColumn)
.skipSnapshotBackfill(skipSnapshotBackFill)
.parseOnLineSchemaChanges(parseOnlineSchemaChanges)
.useLegacyJsonFormat(useLegacyJsonFormat)
.build();
return SourceProvider.of(parallelSource);
} else {
@ -310,7 +314,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
heartbeatInterval,
chunkKeyColumn,
skipSnapshotBackFill,
parseOnlineSchemaChanges);
parseOnlineSchemaChanges,
useLegacyJsonFormat);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;

@ -104,6 +104,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean parseOnLineSchemaChanges =
config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
@ -148,7 +149,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
heartbeatInterval,
chunkKeyColumn,
skipSnapshotBackFill,
parseOnLineSchemaChanges);
parseOnLineSchemaChanges,
useLegacyJsonFormat);
}
@Override
@ -195,6 +197,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
return options;
}

@ -56,6 +56,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOpt
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.USE_LEGACY_JSON_FORMAT;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@ -129,7 +130,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -176,7 +178,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
"testCol",
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -219,7 +222,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -260,7 +264,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -280,6 +285,7 @@ public class MySqlTableSourceFactoryTest {
options.put("scan.incremental.snapshot.chunk.key-column", "testCol");
options.put("scan.incremental.close-idle-reader.enabled", "true");
options.put("scan.incremental.snapshot.backfill.skip", "true");
options.put("use.legacy.json.format", "true");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
@ -317,7 +323,8 @@ public class MySqlTableSourceFactoryTest {
Duration.ofMillis(15213),
"testCol",
true,
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
true);
assertEquals(expectedSource, actualSource);
assertTrue(actualSource instanceof MySqlTableSource);
MySqlTableSource actualMySqlTableSource = (MySqlTableSource) actualSource;
@ -372,7 +379,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -411,7 +419,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -451,7 +460,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -492,7 +502,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -531,7 +542,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -575,7 +587,8 @@ public class MySqlTableSourceFactoryTest {
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

@ -111,4 +111,4 @@ CREATE TABLE user_info
balance DECIMAL(18, 2),
balance2 DECIMAL(18, 2),
PRIMARY KEY (user_id)
) DEFAULT CHARSET=utf8;
) DEFAULT CHARSET=utf8;

@ -100,4 +100,4 @@ VALUES (DEFAULT, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215
ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'),
ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))'));
ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))'));

Loading…
Cancel
Save