[FLINK-34865][pipeline-connector/mysql] Support sync newly added table's comment

This closes #3869
pull/3679/merge
North Lin 2 weeks ago committed by GitHub
parent bbef6213a6
commit 8815f2b879
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -122,6 +122,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
if (tableEditor.hasPrimaryKey()) {
builder.primaryKey(tableEditor.primaryKeyColumnNames());
}
builder.comment(tableEditor.create().comment());
changes.add(
new CreateTableEvent(
toCdcTableId(tableEditor.tableId()), builder.build()));
@ -413,6 +414,21 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
super.exitDropTable(ctx);
}
@Override
public void enterTableOptionComment(MySqlParser.TableOptionCommentContext ctx) {
if (!parser.skipComments()) {
parser.runIfNotNull(
() -> {
if (ctx.COMMENT() != null) {
tableEditor.setComment(
parser.withoutQuotes(ctx.STRING_LITERAL().getText()));
}
},
tableEditor);
}
super.enterTableOptionComment(ctx);
}
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(),

@ -79,6 +79,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
@ -1076,6 +1077,75 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
actual.stream().map(Object::toString).collect(Collectors.toList()));
}
@Test
public void testIncludeCommentsForScanBinlogNewlyAddedTableEnabled() throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products");
TableId newTableId =
TableId.tableId(inventoryDatabase.getDatabaseName(), "products_with_comments2");
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL8_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL8_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(SERVER_TIME_ZONE.key(), "UTC");
options.put(INCLUDE_COMMENTS_ENABLED.key(), "true");
options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".products\\.*");
Factory.Context context =
new FactoryHelper.DefaultContext(
Configuration.fromMap(options), null, this.getClass().getClassLoader());
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) dataSource.getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
Thread.sleep(5_000);
String createTableSql =
String.format(
"CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
+ " id INTEGER NOT NULL AUTO_INCREMENT COMMENT 'column comment of id' PRIMARY KEY,\n"
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink' COMMENT 'column comment of name',\n"
+ " weight FLOAT COMMENT 'column comment of weight'\n"
+ ")\n"
+ "COMMENT 'table comment of products';",
inventoryDatabase.getDatabaseName(), "products_with_comments2");
executeSql(inventoryDatabase, createTableSql);
// add some column
String addColumnSql =
String.format(
"ALTER TABLE `%s`.`products_with_comments2` ADD COLUMN `description` VARCHAR(512) comment 'column comment of description';",
inventoryDatabase.getDatabaseName());
executeSql(inventoryDatabase, addColumnSql);
List<Event> expectedEvents = new ArrayList<>();
CreateTableEvent productCreateTableEvent = getProductsCreateTableEvent(tableId);
expectedEvents.add(productCreateTableEvent);
// generate snapshot data
List<Event> productExpectedSnapshot = getSnapshotExpected(tableId);
expectedEvents.addAll(productExpectedSnapshot);
List<Event> newTableExpectedEvents = getEventsWithComments(newTableId);
expectedEvents.addAll(newTableExpectedEvents);
List<Event> actual = fetchResults(events, expectedEvents.size());
assertEqualsInAnyOrder(
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
actual.stream().map(Object::toString).collect(Collectors.toList()));
}
private void executeSql(UniqueDatabase database, String sql) throws SQLException {
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {

Loading…
Cancel
Save