diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index 0de46fd49..afd189716 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -305,6 +305,13 @@ pipeline: Boolean 是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。 + + treat-tinyint1-as-boolean.enabled + optional + true + Boolean + 是否将TINYINT(1)类型当做Boolean类型处理,默认true。 + diff --git a/docs/content.zh/docs/faq/faq.md b/docs/content.zh/docs/faq/faq.md index 54b55fe99..abacaf548 100644 --- a/docs/content.zh/docs/faq/faq.md +++ b/docs/content.zh/docs/faq/faq.md @@ -207,6 +207,19 @@ restart-strategy.fixed-delay.delay= 30s 1. tableList选项要求表名使用数据库名,而不是DataStream API中的表名。对于MySQL CDC源代码,tableList选项值应该类似于‘my_db.my_table’。 2. 如果要同步排除products和orders表之外的整个my_db库,tableList选项值应该类似于‘my_db.(?!products|orders).*’。 +### Q16: MySQL源表中存在TINYINT(1)类型的列,且部分行的数值>1,Pipeline作业下游接收到的数据却是true/false,为什么? +这是由于MySQL连接参数`tinyInt1isBit`默认值为`true`,Flink CDC 3.3.0之前的版本未处理该参数,导致TINYINT(1)类型的数据被解析为布尔值。 +若需将其转换为实际值,请将CDC升级至3.3.0+,并在source节点添加配置`treat-tinyint1-as-boolean.enabled: false`。 +例如: +```yaml +source: + type: mysql + ... + treat-tinyint1-as-boolean.enabled: false + +sink: + type: ... +``` ## Postgres CDC FAQ ### Q1: 发现 PG 服务器磁盘使用率高,WAL 不释放 是什么原因? diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index df0faec4d..9bcd2a617 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -313,6 +313,13 @@ pipeline: Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.
Note: Enable this option will bring the implications on memory usage. + + treat-tinyint1-as-boolean.enabled + optional + true + Boolean + Whether treat TINYINT(1) as boolean, by default is true. + diff --git a/docs/content/docs/faq/faq.md b/docs/content/docs/faq/faq.md index 2b57167d5..a74c35506 100644 --- a/docs/content/docs/faq/faq.md +++ b/docs/content/docs/faq/faq.md @@ -210,6 +210,19 @@ The reason for this problem is that the reading of the full volume phase of the 1. The `tableList` option requires table name with database name rather than table name in DataStream API. For MySQL CDC source, the `tableList` option value should like ‘my_db.my_table’. 2. If you need to synchronize the whole mydb database excluding the products and orders tables, the `tableList` option value should like 'my_db.(?!products|orders).*'. +### Q16: In MySQL source table, there is a TINYINT(1) column where some rows contain values greater than 1. However, downstreams receive this data as true/false in the pipeline job. Why does this happen? +This is because the default value of the MySQL connection parameter `tinyInt1isBit` is true and the version of Flink CDC before 3.3.0 didn't convert it, which causes the TINYINT(1) data to be interpreted as boolean values. +To convert it to actual values, please upgrade your CDC version to 3.3.0+ then add the configuration `treat-tinyint1-as-boolean.enabled: false` at the source node. +For example: +```yaml +source: + type: mysql + ... + treat-tinyint1-as-boolean.enabled: false + +sink: + type: ... +``` ## Postgres CDC FAQ ### Q1: It is found that the disk utilization rate of PG server is high. What is the reason why wal is not released? diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 41e60890e..48819c50c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -41,6 +41,7 @@ import org.apache.flink.cdc.debezium.table.DebeziumOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectPath; +import com.mysql.cj.conf.PropertyKey; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.Tables; import org.slf4j.Logger; @@ -91,6 +92,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; @@ -136,6 +138,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED); + boolean treatTinyInt1AsBoolean = config.get(TREAT_TINYINT1_AS_BOOLEAN_ENABLED); Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); Duration connectTimeout = config.get(CONNECT_TIMEOUT); @@ -164,6 +167,11 @@ public class MySqlDataSourceFactory implements DataSourceFactory { "true"); } + if (!treatTinyInt1AsBoolean) { + // set jdbc config 'tinyInt1isBit' to false + configMap.put(PROPERTIES_PREFIX + PropertyKey.tinyInt1isBit.getKeyName(), "false"); + } + MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(hostname) @@ -189,7 +197,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory { .debeziumProperties(getDebeziumProperties(configMap)) .jdbcProperties(getJdbcProperties(configMap)) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) - .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges); + .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges) + .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index 7b4ee5eab..81912b4a8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -71,7 +71,8 @@ public class MySqlDataSource implements DataSource { DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges(), readableMetadataList, - includeComments); + includeComments, + sourceConfig.isTreatTinyInt1AsBoolean()); MySqlSource source = new MySqlSource<>( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 374721244..89878d0ea 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -298,4 +298,11 @@ public class MySqlDataSourceOptions { .withDescription( "Whether enable include table and column comments, by default is false, if set to true, table and column comments will be sent. " + "Note: Enable this option will bring the implications on memory usage."); + + @Experimental + public static final ConfigOption TREAT_TINYINT1_AS_BOOLEAN_ENABLED = + ConfigOptions.key("treat-tinyint1-as-boolean.enabled") + .booleanType() + .defaultValue(true) + .withDescription("Whether treat TINYINT(1) as boolean, by default is true. "); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 004fc5e1a..7fee4ebfd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -62,6 +62,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final boolean includeSchemaChanges; + private final boolean tinyInt1isBit; private final boolean includeComments; private transient Tables tables; @@ -70,26 +71,35 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private List readableMetadataList; public MySqlEventDeserializer( - DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) { - this(changelogMode, includeSchemaChanges, new ArrayList<>(), false); + DebeziumChangelogMode changelogMode, + boolean includeSchemaChanges, + boolean tinyInt1isBit) { + this( + changelogMode, + includeSchemaChanges, + new ArrayList<>(), + includeSchemaChanges, + tinyInt1isBit); } public MySqlEventDeserializer( DebeziumChangelogMode changelogMode, boolean includeSchemaChanges, List readableMetadataList, - boolean includeComments) { + boolean includeComments, + boolean tinyInt1isBit) { super(new MySqlSchemaDataTypeInference(), changelogMode); this.includeSchemaChanges = includeSchemaChanges; this.readableMetadataList = readableMetadataList; this.includeComments = includeComments; + this.tinyInt1isBit = tinyInt1isBit; } @Override protected List deserializeSchemaChangeRecord(SourceRecord record) { if (includeSchemaChanges) { if (customParser == null) { - customParser = new CustomMySqlAntlrDdlParser(includeComments); + customParser = new CustomMySqlAntlrDdlParser(includeComments, tinyInt1isBit); tables = new Tables(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 3b30b3c49..31ee183e5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -60,6 +60,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { private final MySqlAntlrDdlParser parser; private final List listeners; private final LinkedList changes; + private final boolean tinyInt1isBit; private org.apache.flink.cdc.common.event.TableId currentTable; private List columnEditors; private CustomColumnDefinitionParserListener columnDefinitionListener; @@ -70,10 +71,12 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { public CustomAlterTableParserListener( MySqlAntlrDdlParser parser, List listeners, - LinkedList changes) { + LinkedList changes, + boolean tinyInt1isBit) { this.parser = parser; this.listeners = listeners; this.changes = changes; + this.tinyInt1isBit = tinyInt1isBit; } @Override @@ -315,7 +318,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { String newColumnName = parser.parseName(ctx.newColumn); Map typeMapping = new HashMap<>(); - typeMapping.put(column.name(), fromDbzColumn(column)); + typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit)); changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); if (newColumnName != null && !column.name().equalsIgnoreCase(newColumnName)) { @@ -366,7 +369,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { () -> { Column column = columnDefinitionListener.getColumn(); Map typeMapping = new HashMap<>(); - typeMapping.put(column.name(), fromDbzColumn(column)); + typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit)); changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); listeners.remove(columnDefinitionListener); }, @@ -413,7 +416,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { return org.apache.flink.cdc.common.schema.Column.physicalColumn( dbzColumn.name(), - fromDbzColumn(dbzColumn), + fromDbzColumn(dbzColumn, tinyInt1isBit), dbzColumn.comment(), dbzColumn.defaultValueExpression().orElse(null)); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java index 624d1ac41..9a29ebe46 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java @@ -35,10 +35,12 @@ import java.util.List; public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser { private final LinkedList parsedEvents; + private final boolean tinyInt1isBit; - public CustomMySqlAntlrDdlParser(boolean includeComments) { + public CustomMySqlAntlrDdlParser(boolean includeComments, boolean tinyInt1isBit) { super(true, false, includeComments, null, Tables.TableFilter.includeAll()); this.parsedEvents = new LinkedList<>(); + this.tinyInt1isBit = tinyInt1isBit; } // Overriding this method because the BIT type requires default length dimension of 1. @@ -278,7 +280,7 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser { @Override protected AntlrDdlParserListener createParseTreeWalkerListener() { - return new CustomMySqlAntlrDdlParserListener(this, parsedEvents); + return new CustomMySqlAntlrDdlParserListener(this, parsedEvents, tinyInt1isBit); } public List getAndClearParsedEvents() { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java index 445373309..7e24e2647 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java @@ -74,12 +74,15 @@ public class CustomMySqlAntlrDdlParserListener extends MySqlParserBaseListener private final Collection errors = new ArrayList<>(); public CustomMySqlAntlrDdlParserListener( - MySqlAntlrDdlParser parser, LinkedList parsedEvents) { + MySqlAntlrDdlParser parser, + LinkedList parsedEvents, + boolean tinyInt1isBit) { // initialize listeners listeners.add(new CreateAndAlterDatabaseParserListener(parser)); listeners.add(new DropDatabaseParserListener(parser)); listeners.add(new CreateTableParserListener(parser, listeners)); - listeners.add(new CustomAlterTableParserListener(parser, listeners, parsedEvents)); + listeners.add( + new CustomAlterTableParserListener(parser, listeners, parsedEvents, tinyInt1isBit)); listeners.add(new DropTableParserListener(parser)); listeners.add(new RenameTableParserListener(parser)); listeners.add(new TruncateTableParserListener(parser)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index b5a6ec197..143efa120 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -202,7 +202,8 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { Column column = columns.get(i); String colName = column.name(); - DataType dataType = MySqlTypeUtils.fromDbzColumn(column); + DataType dataType = + MySqlTypeUtils.fromDbzColumn(column, sourceConfig.isTreatTinyInt1AsBoolean()); if (!column.isOptional()) { dataType = dataType.notNull(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index bc4135cd1..616e28b1d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -129,14 +129,14 @@ public class MySqlSchemaUtils { new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) { TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId)); - return toSchema(tableSchema.getTable()); + return toSchema(tableSchema.getTable(), sourceConfig.isTreatTinyInt1AsBoolean()); } } - public static Schema toSchema(Table table) { + public static Schema toSchema(Table table, boolean tinyInt1isBit) { List columns = table.columns().stream() - .map(MySqlSchemaUtils::toColumn) + .map(column -> toColumn(column, tinyInt1isBit)) .collect(Collectors.toList()); return Schema.newBuilder() @@ -146,9 +146,11 @@ public class MySqlSchemaUtils { .build(); } - public static Column toColumn(io.debezium.relational.Column column) { + public static Column toColumn(io.debezium.relational.Column column, boolean tinyInt1isBit) { return Column.physicalColumn( - column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment()); + column.name(), + MySqlTypeUtils.fromDbzColumn(column, tinyInt1isBit), + column.comment()); } public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java index c82525cf6..05a23f7cc 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -110,8 +110,8 @@ public class MySqlTypeUtils { private static final int FLOAT_LENGTH_UNSPECIFIED_FLAG = -1; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ - public static DataType fromDbzColumn(Column column) { - DataType dataType = convertFromColumn(column); + public static DataType fromDbzColumn(Column column, boolean tinyInt1isBit) { + DataType dataType = convertFromColumn(column, tinyInt1isBit); if (column.isOptional()) { return dataType; } else { @@ -123,7 +123,7 @@ public class MySqlTypeUtils { * Returns a corresponding Flink data type from a debezium {@link Column} with nullable always * be true. */ - private static DataType convertFromColumn(Column column) { + private static DataType convertFromColumn(Column column, boolean tinyInt1isBit) { String typeName = column.typeName(); switch (typeName) { case BIT: @@ -138,7 +138,9 @@ public class MySqlTypeUtils { // user should not use tinyint(1) to store number although jdbc url parameter // tinyInt1isBit=false can help change the return value, it's not a general way // btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default - return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT(); + return (column.length() == 1 && tinyInt1isBit) + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT(); case TINYINT_UNSIGNED: case TINYINT_UNSIGNED_ZEROFILL: case SMALLINT: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java index 94f1ed45d..96f3b5f92 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java @@ -102,13 +102,23 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { } @Test - public void testMysql57AccessCommonTypesSchema() { - testAccessCommonTypesSchema(fullTypesMySql57Database); + public void testMysql57AccessCommonTypesSchemaTinyInt1isBit() { + testAccessCommonTypesSchema(fullTypesMySql57Database, true); } @Test - public void testMysql8AccessCommonTypesSchema() { - testAccessCommonTypesSchema(fullTypesMySql8Database); + public void testMysql57AccessCommonTypesSchemaTinyInt1isNotBit() { + testAccessCommonTypesSchema(fullTypesMySql57Database, false); + } + + @Test + public void testMysql8AccessCommonTypesSchemaTinyInt1isBit() { + testAccessCommonTypesSchema(fullTypesMySql8Database, true); + } + + @Test + public void testMysql8AccessCommonTypesSchemaTinyInt1isNotBit() { + testAccessCommonTypesSchema(fullTypesMySql8Database, false); } @Test @@ -117,7 +127,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { String[] tables = new String[] {"time_types"}; MySqlMetadataAccessor metadataAccessor = - getMetadataAccessor(tables, fullTypesMySql57Database); + getMetadataAccessor(tables, fullTypesMySql57Database, true); Schema actualSchema = metadataAccessor.getTableSchema( @@ -163,7 +173,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { String[] tables = new String[] {"time_types"}; MySqlMetadataAccessor metadataAccessor = - getMetadataAccessor(tables, fullTypesMySql8Database); + getMetadataAccessor(tables, fullTypesMySql8Database, true); Schema actualSchema = metadataAccessor.getTableSchema( @@ -213,7 +223,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { String[] tables = new String[] {"precision_types"}; MySqlMetadataAccessor metadataAccessor = - getMetadataAccessor(tables, fullTypesMySql57Database); + getMetadataAccessor(tables, fullTypesMySql57Database, true); Schema actualSchema = metadataAccessor.getTableSchema( @@ -288,7 +298,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { String[] tables = new String[] {"precision_types"}; MySqlMetadataAccessor metadataAccessor = - getMetadataAccessor(tables, fullTypesMySql8Database); + getMetadataAccessor(tables, fullTypesMySql8Database, false); Schema actualSchema = metadataAccessor.getTableSchema( @@ -361,7 +371,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { database.createAndInitialize(); String[] tables = new String[] {"common_types", "time_types", "precision_types"}; - MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database); + MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database, true); assertThatThrownBy(metadataAccessor::listNamespaces) .isInstanceOf(UnsupportedOperationException.class); @@ -377,11 +387,12 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { assertThat(actualTables).containsExactlyInAnyOrderElementsOf(expectedTables); } - private void testAccessCommonTypesSchema(UniqueDatabase database) { + private void testAccessCommonTypesSchema(UniqueDatabase database, boolean tinyint1IsBit) { database.createAndInitialize(); String[] tables = new String[] {"common_types"}; - MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database); + MySqlMetadataAccessor metadataAccessor = + getMetadataAccessor(tables, database, tinyint1IsBit); Schema actualSchema = metadataAccessor.getTableSchema( @@ -427,8 +438,12 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.BINARY(1), - DataTypes.BOOLEAN(), - DataTypes.BOOLEAN(), + tinyint1IsBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT(), + tinyint1IsBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT(), DataTypes.BINARY(16), DataTypes.BINARY(8), DataTypes.STRING(), @@ -507,12 +522,14 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { assertThat(actualSchema).isEqualTo(expectedSchema); } - private MySqlMetadataAccessor getMetadataAccessor(String[] tables, UniqueDatabase database) { - MySqlSourceConfig sourceConfig = getConfig(tables, database); + private MySqlMetadataAccessor getMetadataAccessor( + String[] tables, UniqueDatabase database, boolean tinyint1IsBit) { + MySqlSourceConfig sourceConfig = getConfig(tables, database, tinyint1IsBit); return new MySqlMetadataAccessor(sourceConfig); } - private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase database) { + private MySqlSourceConfig getConfig( + String[] captureTables, UniqueDatabase database, boolean tinyint1IsBit) { String[] captureTableIds = Arrays.stream(captureTables) .map(tableName -> database.getDatabaseName() + "." + tableName) @@ -530,6 +547,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { .username(database.getUsername()) .password(database.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) + .treatTinyInt1AsBoolean(tinyint1IsBit) .createConfig(0); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 1252733f4..3e338a68c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -271,6 +271,16 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { return result; } + @Test + public void testParseAlterStatementTinyintIsBit() throws Exception { + testParseAlterStatement(true); + } + + @Test + public void testParseAlterStatementTinyint1IsNotBit() throws Exception { + testParseAlterStatement(false); + } + @Test public void testInitialStartupModeWithOpTs() throws Exception { inventoryDatabase.createAndInitialize(); @@ -433,10 +443,10 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { } } - @Test - public void testParseAlterStatement() throws Exception { + public void testParseAlterStatement(boolean tinyInt1isBit) throws Exception { env.setParallelism(1); inventoryDatabase.createAndInitialize(); + MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(MYSQL8_CONTAINER.getHost()) @@ -448,6 +458,7 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { .startupOptions(StartupOptions.latest()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") + .treatTinyInt1AsBoolean(tinyInt1isBit) .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = @@ -548,6 +559,21 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("cols9", DataTypes.CHAR(1)))))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` ADD COLUMN `cols10` TINYINT(1) NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "cols10", + tinyInt1isBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT()))))); + // Drop orders table first to remove foreign key restraints statement.execute( String.format( @@ -569,9 +595,19 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { } @Test - public void testSchemaChangeEvents() throws Exception { + public void testSchemaChangeEventstinyInt1isBit() throws Exception { + testSchemaChangeEvents(true); + } + + @Test + public void testSchemaChangeEventsTinyint1IsNotBit() throws Exception { + testSchemaChangeEvents(false); + } + + public void testSchemaChangeEvents(boolean tinyInt1isBit) throws Exception { env.setParallelism(1); inventoryDatabase.createAndInitialize(); + MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory() .hostname(MYSQL8_CONTAINER.getHost()) @@ -583,6 +619,7 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { .startupOptions(StartupOptions.latest()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") + .treatTinyInt1AsBoolean(tinyInt1isBit) .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = @@ -614,6 +651,35 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("newcol1", DataTypes.INT()))))); + // Add a TINYINT(1) column + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` ADD COLUMN `new_tinyint1_col1` TINYINT(1) NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "new_tinyint1_col1", + tinyInt1isBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT()))))); + + // Add a new BOOLEAN column + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` ADD COLUMN `new_bool_col1` bool NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "new_bool_col1", DataTypes.BOOLEAN()))))); + // Test MODIFY COLUMN DDL statement.execute( String.format( @@ -625,6 +691,16 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), Collections.singletonMap("newcol1", DataTypes.DOUBLE()))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` MODIFY COLUMN `new_tinyint1_col1` INT;", + inventoryDatabase.getDatabaseName())); + + expected.add( + new AlterColumnTypeEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("new_tinyint1_col1", DataTypes.INT()))); + // Test CHANGE COLUMN DDL statement.execute( String.format( @@ -790,7 +866,11 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { .physicalColumn("big_decimal_c", DataTypes.STRING()) .physicalColumn("bit1_c", DataTypes.BOOLEAN()) .physicalColumn("bit3_c", DataTypes.BINARY(1)) - .physicalColumn("tiny1_c", DataTypes.BOOLEAN()) + .physicalColumn( + "tiny1_c", + tinyInt1isBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT()) .physicalColumn("boolean_c", DataTypes.BOOLEAN()) .physicalColumn("file_uuid", DataTypes.BINARY(16)) .physicalColumn("bit_c", DataTypes.BINARY(8)) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 447fda96a..dee96802c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -252,7 +252,8 @@ public class BinlogSplitReader implements DebeziumReader chunkKeyColumns, boolean skipSnapshotBackfill, - boolean parseOnLineSchemaChanges) { + boolean parseOnLineSchemaChanges, + boolean treatTinyInt1AsBoolean) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -130,6 +132,7 @@ public class MySqlSourceConfig implements Serializable { this.chunkKeyColumns = chunkKeyColumns; this.skipSnapshotBackfill = skipSnapshotBackfill; this.parseOnLineSchemaChanges = parseOnLineSchemaChanges; + this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; } public String getHostname() { @@ -261,4 +264,8 @@ public class MySqlSourceConfig implements Serializable { public boolean isSkipSnapshotBackfill() { return skipSnapshotBackfill; } + + public boolean isTreatTinyInt1AsBoolean() { + return treatTinyInt1AsBoolean; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index f0ca4cc96..c1459bf23 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -71,6 +71,7 @@ public class MySqlSourceConfigFactory implements Serializable { private Map chunkKeyColumns = new HashMap<>(); private boolean skipSnapshotBackfill = false; private boolean parseOnLineSchemaChanges = false; + private boolean treatTinyInt1AsBoolean = true; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -298,6 +299,11 @@ public class MySqlSourceConfigFactory implements Serializable { return this; } + public MySqlSourceConfigFactory treatTinyInt1AsBoolean(boolean treatTinyInt1AsBoolean) { + this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { // hard code server name, because we don't need to distinguish it, docs: @@ -392,6 +398,7 @@ public class MySqlSourceConfigFactory implements Serializable { jdbcProperties, chunkKeyColumns, skipSnapshotBackfill, - parseOnLineSchemaChanges); + parseOnLineSchemaChanges, + treatTinyInt1AsBoolean); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java index 493256099..405fd1f96 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java @@ -45,13 +45,15 @@ public class ChunkUtils { private ChunkUtils() {} public static RowType getChunkKeyColumnType( - Table table, Map chunkKeyColumns) { - return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns)); + Table table, Map chunkKeyColumns, boolean tinyInt1isBit) { + return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumns), tinyInt1isBit); } - public static RowType getChunkKeyColumnType(Column chunkKeyColumn) { + public static RowType getChunkKeyColumnType(Column chunkKeyColumn, boolean tinyInt1isBit) { return (RowType) - ROW(FIELD(chunkKeyColumn.name(), MySqlTypeUtils.fromDbzColumn(chunkKeyColumn))) + ROW(FIELD( + chunkKeyColumn.name(), + MySqlTypeUtils.fromDbzColumn(chunkKeyColumn, tinyInt1isBit))) .getLogicalType(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 7f16cab3f..807d29b0b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -585,7 +585,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { RowType splitKeyType = ChunkUtils.getChunkKeyColumnType( - Column.editor().name("id").type("INT").jdbcType(4).create()); + Column.editor().name("id").type("INT").jdbcType(4).create(), true); List remainingSplits = Arrays.asList( new MySqlSchemalessSnapshotSplit(