diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index cde5aa0c5..79583d83e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -234,6 +234,32 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { super.enterAlterByRenameColumn(ctx); } + @Override + public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + String oldColumnName = parser.parseName(ctx.uid(0)); + ColumnEditor columnEditor = Column.editor().name(oldColumnName); + columnEditor.unsetDefaultValueExpression(); + + columnDefinitionListener = + new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + super.enterAlterByModifyColumn(ctx); + } + + @Override + public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { + parser.runIfNotNull( + () -> { + Column column = columnDefinitionListener.getColumn(); + Map typeMapping = new HashMap<>(); + typeMapping.put(column.name(), fromDbzColumn(column)); + changes.add(new AlterColumnTypeEvent(currentTable, typeMapping)); + listeners.remove(columnDefinitionListener); + }, + columnDefinitionListener); + super.exitAlterByModifyColumn(ctx); + } + @Override public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) { parser.runIfNotNull( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 076e57364..bd2c059bb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -554,6 +554,14 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { inventoryDatabase.getDatabaseName())); expected.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc1", "desc3"))); + statement.execute( + String.format( + "ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("DESC3", DataTypes.VARCHAR(255)))); + statement.execute( String.format( "ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;",