[FLINK-36282][pipeline-connector][cdc-connector][mysql]fix incorrect data type of TINYINT(1) in mysql pipeline connector (#3608)

* [FLINK-36282][pipeline-connector][cdc-connector][mysql]fix incorrect data type of TINYINT(1) in mysql pipeline connector

* reformat code

* Update MySqlPipelineITCase.java

* pass a boolean value instead of Properties

* uodate FAQ

* add a method to get tinyInt1isBit

* add new cdc config `treat-tinyint1-as-boolean`

* Update MySqlChunkSplitter.java

* change param name
pull/3864/head
North Lin 1 month ago committed by GitHub
parent 47e5cd6d89
commit 9ad071c1fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -305,6 +305,13 @@ pipeline:
<td>Boolean</td>
<td>是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。</td>
</tr>
<tr>
<td>treat-tinyint1-as-boolean.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>是否将TINYINT(1)类型当做Boolean类型处理默认true。</td>
</tr>
</tbody>
</table>
</div>

@ -207,6 +207,19 @@ restart-strategy.fixed-delay.delay= 30s
1. tableList选项要求表名使用数据库名而不是DataStream API中的表名。对于MySQL CDC源代码tableList选项值应该类似于my_db.my_table
2. 如果要同步排除products和orders表之外的整个my_db库tableList选项值应该类似于my_db.(?!productsorders).*’。
### Q16: MySQL源表中存在TINYINT(1)类型的列,且部分行的数值>1Pipeline作业下游接收到的数据却是true/false为什么
这是由于MySQL连接参数`tinyInt1isBit`默认值为`true`Flink CDC 3.3.0之前的版本未处理该参数导致TINYINT(1)类型的数据被解析为布尔值。
若需将其转换为实际值请将CDC升级至3.3.0+并在source节点添加配置`treat-tinyint1-as-boolean.enabled: false`。
例如:
```yaml
source:
type: mysql
...
treat-tinyint1-as-boolean.enabled: false
sink:
type: ...
```
## Postgres CDC FAQ
### Q1: 发现 PG 服务器磁盘使用率高WAL 不释放 是什么原因?

@ -313,6 +313,13 @@ pipeline:
<td>Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.<br>
Note: Enable this option will bring the implications on memory usage.</td>
</tr>
<tr>
<td>treat-tinyint1-as-boolean.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether treat TINYINT(1) as boolean, by default is true.</td>
</tr>
</tbody>
</table>
</div>

@ -210,6 +210,19 @@ The reason for this problem is that the reading of the full volume phase of the
1. The `tableList` option requires table name with database name rather than table name in DataStream API. For MySQL CDC source, the `tableList` option value should like my_db.my_table.
2. If you need to synchronize the whole mydb database excluding the products and orders tables, the `tableList` option value should like 'my_db.(?!productsorders).*'.
### Q16: In MySQL source table, there is a TINYINT(1) column where some rows contain values greater than 1. However, downstreams receive this data as true/false in the pipeline job. Why does this happen?
This is because the default value of the MySQL connection parameter `tinyInt1isBit` is true and the version of Flink CDC before 3.3.0 didn't convert it, which causes the TINYINT(1) data to be interpreted as boolean values.
To convert it to actual values, please upgrade your CDC version to 3.3.0+ then add the configuration `treat-tinyint1-as-boolean.enabled: false` at the source node.
For example:
```yaml
source:
type: mysql
...
treat-tinyint1-as-boolean.enabled: false
sink:
type: ...
```
## Postgres CDC FAQ
### Q1: It is found that the disk utilization rate of PG server is high. What is the reason why wal is not released?

@ -41,6 +41,7 @@ import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;
import com.mysql.cj.conf.PropertyKey;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import org.slf4j.Logger;
@ -91,6 +92,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
@ -136,6 +138,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);
boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
@ -164,6 +167,11 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
"true");
}
if (!treatTinyInt1AsBoolean) {
// set jdbc config 'tinyInt1isBit' to false
configMap.put(PROPERTIES_PREFIX + PropertyKey.tinyInt1isBit.getKeyName(), "false");
}
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(hostname)
@ -189,7 +197,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges);
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean);
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

@ -71,7 +71,8 @@ public class MySqlDataSource implements DataSource {
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
readableMetadataList,
includeComments);
includeComments,
sourceConfig.isTreatTinyInt1AsBoolean());
MySqlSource<Event> source =
new MySqlSource<>(

@ -298,4 +298,11 @@ public class MySqlDataSourceOptions {
.withDescription(
"Whether enable include table and column comments, by default is false, if set to true, table and column comments will be sent. "
+ "Note: Enable this option will bring the implications on memory usage.");
@Experimental
public static final ConfigOption<Boolean> TREAT_TINYINT1_AS_BOOLEAN_ENABLED =
ConfigOptions.key("treat-tinyint1-as-boolean.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether treat TINYINT(1) as boolean, by default is true. ");
}

@ -62,6 +62,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final boolean includeSchemaChanges;
private final boolean tinyInt1isBit;
private final boolean includeComments;
private transient Tables tables;
@ -70,26 +71,35 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private List<MySqlReadableMetadata> readableMetadataList;
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
this(changelogMode, includeSchemaChanges, new ArrayList<>(), false);
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
boolean tinyInt1isBit) {
this(
changelogMode,
includeSchemaChanges,
new ArrayList<>(),
includeSchemaChanges,
tinyInt1isBit);
}
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
List<MySqlReadableMetadata> readableMetadataList,
boolean includeComments) {
boolean includeComments,
boolean tinyInt1isBit) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
this.readableMetadataList = readableMetadataList;
this.includeComments = includeComments;
this.tinyInt1isBit = tinyInt1isBit;
}
@Override
protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
if (customParser == null) {
customParser = new CustomMySqlAntlrDdlParser(includeComments);
customParser = new CustomMySqlAntlrDdlParser(includeComments, tinyInt1isBit);
tables = new Tables();
}

@ -60,6 +60,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private final MySqlAntlrDdlParser parser;
private final List<ParseTreeListener> listeners;
private final LinkedList<SchemaChangeEvent> changes;
private final boolean tinyInt1isBit;
private org.apache.flink.cdc.common.event.TableId currentTable;
private List<ColumnEditor> columnEditors;
private CustomColumnDefinitionParserListener columnDefinitionListener;
@ -70,10 +71,12 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
public CustomAlterTableParserListener(
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners,
LinkedList<SchemaChangeEvent> changes) {
LinkedList<SchemaChangeEvent> changes,
boolean tinyInt1isBit) {
this.parser = parser;
this.listeners = listeners;
this.changes = changes;
this.tinyInt1isBit = tinyInt1isBit;
}
@Override
@ -315,7 +318,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
String newColumnName = parser.parseName(ctx.newColumn);
Map<String, DataType> typeMapping = new HashMap<>();
typeMapping.put(column.name(), fromDbzColumn(column));
typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) {
@ -366,7 +369,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
() -> {
Column column = columnDefinitionListener.getColumn();
Map<String, DataType> typeMapping = new HashMap<>();
typeMapping.put(column.name(), fromDbzColumn(column));
typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit));
changes.add(new AlterColumnTypeEvent(currentTable, typeMapping));
listeners.remove(columnDefinitionListener);
},
@ -413,7 +416,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(),
fromDbzColumn(dbzColumn),
fromDbzColumn(dbzColumn, tinyInt1isBit),
dbzColumn.comment(),
dbzColumn.defaultValueExpression().orElse(null));
}

@ -35,10 +35,12 @@ import java.util.List;
public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {
private final LinkedList<SchemaChangeEvent> parsedEvents;
private final boolean tinyInt1isBit;
public CustomMySqlAntlrDdlParser(boolean includeComments) {
public CustomMySqlAntlrDdlParser(boolean includeComments, boolean tinyInt1isBit) {
super(true, false, includeComments, null, Tables.TableFilter.includeAll());
this.parsedEvents = new LinkedList<>();
this.tinyInt1isBit = tinyInt1isBit;
}
// Overriding this method because the BIT type requires default length dimension of 1.
@ -278,7 +280,7 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {
@Override
protected AntlrDdlParserListener createParseTreeWalkerListener() {
return new CustomMySqlAntlrDdlParserListener(this, parsedEvents);
return new CustomMySqlAntlrDdlParserListener(this, parsedEvents, tinyInt1isBit);
}
public List<SchemaChangeEvent> getAndClearParsedEvents() {

@ -74,12 +74,15 @@ public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener
private final Collection<ParsingException> errors = new ArrayList<>();
public CustomMySqlAntlrDdlParserListener(
MySqlAntlrDdlParser parser, LinkedList<SchemaChangeEvent> parsedEvents) {
MySqlAntlrDdlParser parser,
LinkedList<SchemaChangeEvent> parsedEvents,
boolean tinyInt1isBit) {
// initialize listeners
listeners.add(new CreateAndAlterDatabaseParserListener(parser));
listeners.add(new DropDatabaseParserListener(parser));
listeners.add(new CreateTableParserListener(parser, listeners));
listeners.add(new CustomAlterTableParserListener(parser, listeners, parsedEvents));
listeners.add(
new CustomAlterTableParserListener(parser, listeners, parsedEvents, tinyInt1isBit));
listeners.add(new DropTableParserListener(parser));
listeners.add(new RenameTableParserListener(parser));
listeners.add(new TruncateTableParserListener(parser));

@ -202,7 +202,8 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
Column column = columns.get(i);
String colName = column.name();
DataType dataType = MySqlTypeUtils.fromDbzColumn(column);
DataType dataType =
MySqlTypeUtils.fromDbzColumn(column, sourceConfig.isTreatTinyInt1AsBoolean());
if (!column.isOptional()) {
dataType = dataType.notNull();
}

@ -129,14 +129,14 @@ public class MySqlSchemaUtils {
new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) {
TableChanges.TableChange tableSchema =
mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId));
return toSchema(tableSchema.getTable());
return toSchema(tableSchema.getTable(), sourceConfig.isTreatTinyInt1AsBoolean());
}
}
public static Schema toSchema(Table table) {
public static Schema toSchema(Table table, boolean tinyInt1isBit) {
List<Column> columns =
table.columns().stream()
.map(MySqlSchemaUtils::toColumn)
.map(column -> toColumn(column, tinyInt1isBit))
.collect(Collectors.toList());
return Schema.newBuilder()
@ -146,9 +146,11 @@ public class MySqlSchemaUtils {
.build();
}
public static Column toColumn(io.debezium.relational.Column column) {
public static Column toColumn(io.debezium.relational.Column column, boolean tinyInt1isBit) {
return Column.physicalColumn(
column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment());
column.name(),
MySqlTypeUtils.fromDbzColumn(column, tinyInt1isBit),
column.comment());
}
public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {

@ -110,8 +110,8 @@ public class MySqlTypeUtils {
private static final int FLOAT_LENGTH_UNSPECIFIED_FLAG = -1;
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
DataType dataType = convertFromColumn(column);
public static DataType fromDbzColumn(Column column, boolean tinyInt1isBit) {
DataType dataType = convertFromColumn(column, tinyInt1isBit);
if (column.isOptional()) {
return dataType;
} else {
@ -123,7 +123,7 @@ public class MySqlTypeUtils {
* Returns a corresponding Flink data type from a debezium {@link Column} with nullable always
* be true.
*/
private static DataType convertFromColumn(Column column) {
private static DataType convertFromColumn(Column column, boolean tinyInt1isBit) {
String typeName = column.typeName();
switch (typeName) {
case BIT:
@ -138,7 +138,9 @@ public class MySqlTypeUtils {
// user should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way
// btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default
return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT();
return (column.length() == 1 && tinyInt1isBit)
? DataTypes.BOOLEAN()
: DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:

@ -102,13 +102,23 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
}
@Test
public void testMysql57AccessCommonTypesSchema() {
testAccessCommonTypesSchema(fullTypesMySql57Database);
public void testMysql57AccessCommonTypesSchemaTinyInt1isBit() {
testAccessCommonTypesSchema(fullTypesMySql57Database, true);
}
@Test
public void testMysql8AccessCommonTypesSchema() {
testAccessCommonTypesSchema(fullTypesMySql8Database);
public void testMysql57AccessCommonTypesSchemaTinyInt1isNotBit() {
testAccessCommonTypesSchema(fullTypesMySql57Database, false);
}
@Test
public void testMysql8AccessCommonTypesSchemaTinyInt1isBit() {
testAccessCommonTypesSchema(fullTypesMySql8Database, true);
}
@Test
public void testMysql8AccessCommonTypesSchemaTinyInt1isNotBit() {
testAccessCommonTypesSchema(fullTypesMySql8Database, false);
}
@Test
@ -117,7 +127,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
String[] tables = new String[] {"time_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database);
getMetadataAccessor(tables, fullTypesMySql57Database, true);
Schema actualSchema =
metadataAccessor.getTableSchema(
@ -163,7 +173,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
String[] tables = new String[] {"time_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql8Database);
getMetadataAccessor(tables, fullTypesMySql8Database, true);
Schema actualSchema =
metadataAccessor.getTableSchema(
@ -213,7 +223,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
String[] tables = new String[] {"precision_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database);
getMetadataAccessor(tables, fullTypesMySql57Database, true);
Schema actualSchema =
metadataAccessor.getTableSchema(
@ -288,7 +298,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
String[] tables = new String[] {"precision_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql8Database);
getMetadataAccessor(tables, fullTypesMySql8Database, false);
Schema actualSchema =
metadataAccessor.getTableSchema(
@ -361,7 +371,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
database.createAndInitialize();
String[] tables = new String[] {"common_types", "time_types", "precision_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true);
assertThatThrownBy(metadataAccessor::listNamespaces)
.isInstanceOf(UnsupportedOperationException.class);
@ -377,11 +387,12 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
assertThat(actualTables).containsExactlyInAnyOrderElementsOf(expectedTables);
}
private void testAccessCommonTypesSchema(UniqueDatabase database) {
private void testAccessCommonTypesSchema(UniqueDatabase database, boolean tinyint1IsBit) {
database.createAndInitialize();
String[] tables = new String[] {"common_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, database, tinyint1IsBit);
Schema actualSchema =
metadataAccessor.getTableSchema(
@ -427,8 +438,12 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
DataTypes.STRING(),
DataTypes.BOOLEAN(),
DataTypes.BINARY(1),
DataTypes.BOOLEAN(),
DataTypes.BOOLEAN(),
tinyint1IsBit
? DataTypes.BOOLEAN()
: DataTypes.TINYINT(),
tinyint1IsBit
? DataTypes.BOOLEAN()
: DataTypes.TINYINT(),
DataTypes.BINARY(16),
DataTypes.BINARY(8),
DataTypes.STRING(),
@ -507,12 +522,14 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
assertThat(actualSchema).isEqualTo(expectedSchema);
}
private MySqlMetadataAccessor getMetadataAccessor(String[] tables, UniqueDatabase database) {
MySqlSourceConfig sourceConfig = getConfig(tables, database);
private MySqlMetadataAccessor getMetadataAccessor(
String[] tables, UniqueDatabase database, boolean tinyint1IsBit) {
MySqlSourceConfig sourceConfig = getConfig(tables, database, tinyint1IsBit);
return new MySqlMetadataAccessor(sourceConfig);
}
private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase database) {
private MySqlSourceConfig getConfig(
String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." + tableName)
@ -530,6 +547,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
.username(database.getUsername())
.password(database.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
.treatTinyInt1AsBoolean(tinyint1IsBit)
.createConfig(0);
}
}

@ -271,6 +271,16 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
return result;
}
@Test
public void testParseAlterStatementTinyintIsBit() throws Exception {
testParseAlterStatement(true);
}
@Test
public void testParseAlterStatementTinyint1IsNotBit() throws Exception {
testParseAlterStatement(false);
}
@Test
public void testInitialStartupModeWithOpTs() throws Exception {
inventoryDatabase.createAndInitialize();
@ -433,10 +443,10 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
}
}
@Test
public void testParseAlterStatement() throws Exception {
public void testParseAlterStatement(boolean tinyInt1isBit) throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
@ -448,6 +458,7 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
.startupOptions(StartupOptions.latest())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
.treatTinyInt1AsBoolean(tinyInt1isBit)
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
FlinkSourceProvider sourceProvider =
@ -548,6 +559,21 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("cols9", DataTypes.CHAR(1))))));
statement.execute(
String.format(
"ALTER TABLE `%s`.`products` ADD COLUMN `cols10` TINYINT(1) NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"cols10",
tinyInt1isBit
? DataTypes.BOOLEAN()
: DataTypes.TINYINT())))));
// Drop orders table first to remove foreign key restraints
statement.execute(
String.format(
@ -569,9 +595,19 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
}
@Test
public void testSchemaChangeEvents() throws Exception {
public void testSchemaChangeEventstinyInt1isBit() throws Exception {
testSchemaChangeEvents(true);
}
@Test
public void testSchemaChangeEventsTinyint1IsNotBit() throws Exception {
testSchemaChangeEvents(false);
}
public void testSchemaChangeEvents(boolean tinyInt1isBit) throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
@ -583,6 +619,7 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
.startupOptions(StartupOptions.latest())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
.treatTinyInt1AsBoolean(tinyInt1isBit)
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
FlinkSourceProvider sourceProvider =
@ -614,6 +651,35 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("newcol1", DataTypes.INT())))));
// Add a TINYINT(1) column
statement.execute(
String.format(
"ALTER TABLE `%s`.`customers` ADD COLUMN `new_tinyint1_col1` TINYINT(1) NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"new_tinyint1_col1",
tinyInt1isBit
? DataTypes.BOOLEAN()
: DataTypes.TINYINT())))));
// Add a new BOOLEAN column
statement.execute(
String.format(
"ALTER TABLE `%s`.`customers` ADD COLUMN `new_bool_col1` bool NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"new_bool_col1", DataTypes.BOOLEAN())))));
// Test MODIFY COLUMN DDL
statement.execute(
String.format(
@ -625,6 +691,16 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonMap("newcol1", DataTypes.DOUBLE())));
statement.execute(
String.format(
"ALTER TABLE `%s`.`customers` MODIFY COLUMN `new_tinyint1_col1` INT;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AlterColumnTypeEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
Collections.singletonMap("new_tinyint1_col1", DataTypes.INT())));
// Test CHANGE COLUMN DDL
statement.execute(
String.format(
@ -790,7 +866,11 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
.physicalColumn("big_decimal_c", DataTypes.STRING())
.physicalColumn("bit1_c", DataTypes.BOOLEAN())
.physicalColumn("bit3_c", DataTypes.BINARY(1))
.physicalColumn("tiny1_c", DataTypes.BOOLEAN())
.physicalColumn(
"tiny1_c",
tinyInt1isBit
? DataTypes.BOOLEAN()
: DataTypes.TINYINT())
.physicalColumn("boolean_c", DataTypes.BOOLEAN())
.physicalColumn("file_uuid", DataTypes.BINARY(16))
.physicalColumn("bit_c", DataTypes.BINARY(8))

@ -252,7 +252,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
RowType splitKeyType =
ChunkUtils.getChunkKeyColumnType(
statefulTaskContext.getDatabaseSchema().tableFor(tableId),
statefulTaskContext.getSourceConfig().getChunkKeyColumns());
statefulTaskContext.getSourceConfig().getChunkKeyColumns(),
statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean());
Struct target = RecordUtils.getStructContainsChunkKey(sourceRecord);
Object[] chunkKey =

@ -107,8 +107,8 @@ public class MySqlTypeUtils {
private static final String UNKNOWN = "UNKNOWN";
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
DataType dataType = convertFromColumn(column);
public static DataType fromDbzColumn(Column column, boolean tinyInt1isBit) {
DataType dataType = convertFromColumn(column, tinyInt1isBit);
if (column.isOptional()) {
return dataType;
} else {
@ -120,7 +120,7 @@ public class MySqlTypeUtils {
* Returns a corresponding Flink data type from a debezium {@link Column} with nullable always
* be true.
*/
private static DataType convertFromColumn(Column column) {
private static DataType convertFromColumn(Column column, boolean tinyInt1isBit) {
String typeName = column.typeName();
switch (typeName) {
case BIT:
@ -135,7 +135,9 @@ public class MySqlTypeUtils {
// user should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way
// btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default
return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT();
return (column.length() == 1 && tinyInt1isBit)
? DataTypes.BOOLEAN()
: DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:

@ -148,7 +148,9 @@ public class MySqlChunkSplitter implements ChunkSplitter {
splitColumn =
ChunkUtils.getChunkKeyColumn(
currentSplittingTable, sourceConfig.getChunkKeyColumns());
splitType = ChunkUtils.getChunkKeyColumnType(splitColumn);
splitType =
ChunkUtils.getChunkKeyColumnType(
splitColumn, sourceConfig.isTreatTinyInt1AsBoolean());
minMaxOfSplitColumn =
StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name());
approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId);
@ -385,7 +387,7 @@ public class MySqlChunkSplitter implements ChunkSplitter {
Object max,
int chunkSize,
long approximateRowCnt) {
if (!isEvenlySplitColumn(splitColumn)) {
if (!isEvenlySplitColumn(splitColumn, sourceConfig.isTreatTinyInt1AsBoolean())) {
return -1;
}
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
@ -410,8 +412,8 @@ public class MySqlChunkSplitter implements ChunkSplitter {
}
/** Checks whether split column is evenly distributed across its range. */
private static boolean isEvenlySplitColumn(Column splitColumn) {
DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn);
private static boolean isEvenlySplitColumn(Column splitColumn, boolean tinyInt1isBit) {
DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn, tinyInt1isBit);
LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
// currently, we only support the optimization that split column with type BIGINT, INT,

@ -74,6 +74,7 @@ public class MySqlSourceConfig implements Serializable {
private final Properties dbzProperties;
private final Configuration dbzConfiguration;
private final MySqlConnectorConfig dbzMySqlConfig;
private final boolean treatTinyInt1AsBoolean;
MySqlSourceConfig(
String hostname,
@ -101,7 +102,8 @@ public class MySqlSourceConfig implements Serializable {
Properties jdbcProperties,
Map<ObjectPath, String> chunkKeyColumns,
boolean skipSnapshotBackfill,
boolean parseOnLineSchemaChanges) {
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@ -130,6 +132,7 @@ public class MySqlSourceConfig implements Serializable {
this.chunkKeyColumns = chunkKeyColumns;
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
}
public String getHostname() {
@ -261,4 +264,8 @@ public class MySqlSourceConfig implements Serializable {
public boolean isSkipSnapshotBackfill() {
return skipSnapshotBackfill;
}
public boolean isTreatTinyInt1AsBoolean() {
return treatTinyInt1AsBoolean;
}
}

@ -71,6 +71,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private Map<ObjectPath, String> chunkKeyColumns = new HashMap<>();
private boolean skipSnapshotBackfill = false;
private boolean parseOnLineSchemaChanges = false;
private boolean treatTinyInt1AsBoolean = true;
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
@ -298,6 +299,11 @@ public class MySqlSourceConfigFactory implements Serializable {
return this;
}
public MySqlSourceConfigFactory treatTinyInt1AsBoolean(boolean treatTinyInt1AsBoolean) {
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
return this;
}
/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
public MySqlSourceConfig createConfig(int subtaskId) {
// hard code server name, because we don't need to distinguish it, docs:
@ -392,6 +398,7 @@ public class MySqlSourceConfigFactory implements Serializable {
jdbcProperties,
chunkKeyColumns,
skipSnapshotBackfill,
parseOnLineSchemaChanges);
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean);
}
}

@ -45,13 +45,15 @@ public class ChunkUtils {
private ChunkUtils() {}
public static RowType getChunkKeyColumnType(
Table table, Map<ObjectPath, String> chunkKeyColumns) {
return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns));
Table table, Map<ObjectPath, String> chunkKeyColumns, boolean tinyInt1isBit) {
return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns), tinyInt1isBit);
}
public static RowType getChunkKeyColumnType(Column chunkKeyColumn) {
public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyInt1isBit) {
return (RowType)
ROW(FIELD(chunkKeyColumn.name(), MySqlTypeUtils.fromDbzColumn(chunkKeyColumn)))
ROW(FIELD(
chunkKeyColumn.name(),
MySqlTypeUtils.fromDbzColumn(chunkKeyColumn, tinyInt1isBit)))
.getLogicalType();
}

@ -585,7 +585,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
RowType splitKeyType =
ChunkUtils.getChunkKeyColumnType(
Column.editor().name("id").type("INT").jdbcType(4).create());
Column.editor().name("id").type("INT").jdbcType(4).create(), true);
List<MySqlSchemalessSnapshotSplit> remainingSplits =
Arrays.asList(
new MySqlSchemalessSnapshotSplit(

Loading…
Cancel
Save