From 8815f2b879fd48ad6211141b7f627eb1d143a02c Mon Sep 17 00:00:00 2001 From: North Lin <37775475+qg-lin@users.noreply.github.com> Date: Fri, 17 Jan 2025 10:52:32 +0800 Subject: [PATCH] [FLINK-34865][pipeline-connector/mysql] Support sync newly added table's comment This closes #3869 --- .../CustomAlterTableParserListener.java | 16 +++++ .../mysql/source/MySqlPipelineITCase.java | 70 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 31ee183e5..af20c531c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -122,6 +122,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { if (tableEditor.hasPrimaryKey()) { builder.primaryKey(tableEditor.primaryKeyColumnNames()); } + builder.comment(tableEditor.create().comment()); changes.add( new CreateTableEvent( toCdcTableId(tableEditor.tableId()), builder.build())); @@ -413,6 +414,21 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { super.exitDropTable(ctx); } + @Override + public void enterTableOptionComment(MySqlParser.TableOptionCommentContext ctx) { + if (!parser.skipComments()) { + parser.runIfNotNull( + () -> { + if (ctx.COMMENT() != null) { + tableEditor.setComment( + parser.withoutQuotes(ctx.STRING_LITERAL().getText())); + } + }, + tableEditor); + } + super.enterTableOptionComment(ctx); + } + private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { return org.apache.flink.cdc.common.schema.Column.physicalColumn( dbzColumn.name(), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 3e338a68c..7ef6cbeb1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -79,6 +79,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; @@ -1076,6 +1077,75 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { actual.stream().map(Object::toString).collect(Collectors.toList())); } + @Test + public void testIncludeCommentsForScanBinlogNewlyAddedTableEnabled() throws Exception { + env.setParallelism(1); + inventoryDatabase.createAndInitialize(); + TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products"); + TableId newTableId = + TableId.tableId(inventoryDatabase.getDatabaseName(), "products_with_comments2"); + + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL8_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL8_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(SERVER_TIME_ZONE.key(), "UTC"); + options.put(INCLUDE_COMMENTS_ENABLED.key(), "true"); + options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".products\\.*"); + Factory.Context context = + new FactoryHelper.DefaultContext( + Configuration.fromMap(options), null, this.getClass().getClassLoader()); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) dataSource.getEventSourceProvider(); + + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + String createTableSql = + String.format( + "CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n" + + " id INTEGER NOT NULL AUTO_INCREMENT COMMENT 'column comment of id' PRIMARY KEY,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'flink' COMMENT 'column comment of name',\n" + + " weight FLOAT COMMENT 'column comment of weight'\n" + + ")\n" + + "COMMENT 'table comment of products';", + inventoryDatabase.getDatabaseName(), "products_with_comments2"); + executeSql(inventoryDatabase, createTableSql); + + // add some column + String addColumnSql = + String.format( + "ALTER TABLE `%s`.`products_with_comments2` ADD COLUMN `description` VARCHAR(512) comment 'column comment of description';", + inventoryDatabase.getDatabaseName()); + executeSql(inventoryDatabase, addColumnSql); + + List expectedEvents = new ArrayList<>(); + CreateTableEvent productCreateTableEvent = getProductsCreateTableEvent(tableId); + expectedEvents.add(productCreateTableEvent); + // generate snapshot data + List productExpectedSnapshot = getSnapshotExpected(tableId); + expectedEvents.addAll(productExpectedSnapshot); + + List newTableExpectedEvents = getEventsWithComments(newTableId); + expectedEvents.addAll(newTableExpectedEvents); + + List actual = fetchResults(events, expectedEvents.size()); + assertEqualsInAnyOrder( + expectedEvents.stream().map(Object::toString).collect(Collectors.toList()), + actual.stream().map(Object::toString).collect(Collectors.toList())); + } + private void executeSql(UniqueDatabase database, String sql) throws SQLException { try (Connection connection = database.getJdbcConnection(); Statement statement = connection.createStatement()) {