diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java index 58dfa641b..0302ed1bd 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java @@ -116,7 +116,7 @@ public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, Sch + typeMapping + ", oldTypeMapping=" + oldTypeMapping - + ", comments='" + + ", comments=" + comments + '}'; } else { @@ -125,7 +125,7 @@ public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, Sch + tableId + ", typeMapping=" + typeMapping - + ", comments='" + + ", comments=" + comments + '}'; } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 55a205d49..432649649 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -836,7 +836,7 @@ class FlinkPipelineComposerITCase { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=age}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, student], after=[], op=DELETE, meta=()}", @@ -1047,7 +1047,7 @@ class FlinkPipelineComposerITCase { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", @@ -1254,7 +1254,7 @@ class FlinkPipelineComposerITCase { .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name, null], after=[2, Bob, 30, last_name, null], op=UPDATE, meta=()}", @@ -1398,7 +1398,7 @@ class FlinkPipelineComposerITCase { // Merging timestamp with different precision "CreateTableEvent{tableId={}_table_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_table_timestamp_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}", + "AlterColumnTypeEvent{tableId={}_table_timestamp_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}, comments={}}", "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", @@ -1406,7 +1406,7 @@ class FlinkPipelineComposerITCase { // Merging zoned timestamp with different precision "CreateTableEvent{tableId={}_table_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0) WITH TIME ZONE}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[3, Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(0) WITH TIME ZONE}}", + "AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(0) WITH TIME ZONE}, comments={}}", "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[4, Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[103, Zen, 19, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[104, Zen, 19, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", @@ -1414,7 +1414,7 @@ class FlinkPipelineComposerITCase { // Merging local-zoned timestamp with different precision "CreateTableEvent{tableId={}_table_local_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP_LTZ(0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[5, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP_LTZ(9)}, oldTypeMapping={birthday=TIMESTAMP_LTZ(0)}}", + "AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP_LTZ(9)}, oldTypeMapping={birthday=TIMESTAMP_LTZ(0)}, comments={}}", "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[6, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[105, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[106, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", @@ -1422,9 +1422,9 @@ class FlinkPipelineComposerITCase { // Merging all "CreateTableEvent{tableId={}_everything_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}", + "AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}, comments={}}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(9)}}", + "AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(9)}, comments={}}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[3, Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[4, Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", "DataChangeEvent{tableId={}_everything_merged, before=[], after=[5, Alice, 17, 2020-01-01T04:28:57-05:00], op=INSERT, meta=()}", @@ -1494,18 +1494,18 @@ class FlinkPipelineComposerITCase { Stream.of( "CreateTableEvent{tableId={}, schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId={}, before=[], after=[1, Alice, 17, 1], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=SMALLINT}, oldTypeMapping={fav_num=TINYINT}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=SMALLINT}, oldTypeMapping={fav_num=TINYINT}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[2, Alice, 17, 22], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=INT}, oldTypeMapping={fav_num=SMALLINT}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=INT}, oldTypeMapping={fav_num=SMALLINT}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[3, Alice, 17, 3333], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=INT}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=INT}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[4, Alice, 17, 44444444], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}", "DataChangeEvent{tableId={}, before=[], after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}, comments={}}", "DataChangeEvent{tableId={}, before=[], after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}", "DataChangeEvent{tableId={}, before=[], after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}", "DataChangeEvent{tableId={}, before=[], after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}", diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java index ad541f447..af37aa5d1 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java @@ -865,7 +865,7 @@ class FlinkPipelineComposerLenientITCase { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, student], after=[], op=DELETE, meta=()}", @@ -1073,7 +1073,7 @@ class FlinkPipelineComposerLenientITCase { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 2ac394add..d8692c126 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1145,7 +1145,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1240,7 +1240,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1329,7 +1329,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5, Eve, 5 -> Eve], after=[5, Eva, 5 -> Eva], op=UPDATE, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6, Fiona, 6 -> Fiona], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6, Fiona, 6 -> Fiona], after=[], op=DELETE, meta=()}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={name=VARCHAR(17)}, oldTypeMapping={name=STRING}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={name=VARCHAR(17)}, oldTypeMapping={name=STRING}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7, Gem, 7 -> Gem], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8, Helen, 8 -> Helen], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8, Helen, 8 -> Helen], after=[8, Harry, 8 -> Harry], op=UPDATE, meta=()}", @@ -1418,7 +1418,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 6 -> Fiona], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 7 -> Gem], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 8 -> Helen], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 8 -> Helen], after=[5th, 8, Harry, 18.0, -3, 8 -> Harry], op=UPDATE, meta=()}", @@ -1513,7 +1513,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6 -> Fiona, 3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7 -> Gem, 4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8 -> Helen, 5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8 -> Helen, 5th, 8, Helen, 18.0, -2], after=[8 -> Harry, 5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1608,7 +1608,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1704,7 +1704,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -1799,7 +1799,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", @@ -2160,7 +2160,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}", // Alter column type - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}, comments={}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[5th, 8, Harry, 18.0, -3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index 976522b60..d8cb28993 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -324,7 +324,7 @@ public class MysqlE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", - "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}", + "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}, comments={}}", "DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}", "DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", @@ -470,7 +470,7 @@ public class MysqlE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1, -U], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1, +U], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1, -U], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1, +U], op=UPDATE, meta=()}", "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1, -D], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}", + "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}, comments={}}", "DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649, +I], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}", "DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649, +I], op=INSERT, meta=()}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index cd342ef46..a3b78134a 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -226,7 +226,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}"); @@ -311,7 +311,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.ALL, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "AddColumnEvent{tableId=%s.ALL, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=NAME}]}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10003, null, null, Fluorite], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10004, null, null, null], op=INSERT, meta=()}"); } @@ -405,7 +405,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "AddColumnEvent{tableId=NEW_%s.ALPHABET, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=NAME}]}", "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", @@ -512,7 +512,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { "AddColumnEvent{tableId=NEW_%s.BETAGAMM, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=VERSION}]}", "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10002, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10003, null, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}"); @@ -625,7 +625,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=NEW_%s.TABLEC, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=%s.TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=%s.TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", @@ -716,7 +716,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.ALL, before=[], after=[10001, 12, extras, Derrida], op=INSERT, meta=()}", "AddColumnEvent{tableId=%s.ALL, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=NAME}]}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10002, null, extras, null, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=%s.ALL, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10003, null, extras, null, Fluorite], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.ALL, before=[], after=[10004, null, extras, null, null], op=INSERT, meta=()}"); } @@ -811,7 +811,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}, comments={}}", "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, droppedColumnNames=[VERSION]}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 98e4deba9..aab3e0ae6 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -96,7 +96,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", @@ -116,7 +116,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.merged, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.merged, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=AFTER, existedColumnName=gender}]}", "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=AFTER, existedColumnName=precise_age}]}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", @@ -151,7 +151,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null], op=INSERT, meta=()}"), Arrays.asList( "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", - "Failed to apply schema change AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", + "Failed to apply schema change AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", @@ -197,7 +197,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", @@ -220,7 +220,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, null], op=INSERT, meta=()}", @@ -309,7 +309,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.redirect, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.redirect, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.redirect, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.redirect, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", @@ -437,7 +437,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java index 7523387cf..bd9056802 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java @@ -97,7 +97,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20], op=INSERT, meta=()}", @@ -116,7 +116,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=tag}]}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.merged, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.merged, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.merged, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=AFTER, existedColumnName=gender}]}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.merged, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}")); @@ -151,7 +151,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, null, 1028196, age < 20], op=INSERT, meta=()}"), Arrays.asList( "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", - "Failed to apply schema change AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", + "Failed to apply schema change AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "Failed to apply schema change TruncateTableEvent{tableId=%s.members}, but keeps running in tolerant mode. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=TruncateTableEvent{tableId=%s.members}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", @@ -195,7 +195,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20, 0], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}")); @@ -212,7 +212,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { Arrays.asList( "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}, comments={}}", "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, null, 1026169, age < 20], op=INSERT, meta=()}", "TruncateTableEvent{tableId=%s.members}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 8b292ff85..0eaf9a794 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -994,7 +994,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment { validateEvents( "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`LAST` VARCHAR(17), position=AFTER, existedColumnName=NAMEALPHA}]}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3008, 8, 8, 80, 17, Jazz, Last, id -> 3008], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}", + "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3009, 9, 9.0, 90, 18, Keka, Finale, id -> 3009], op=INSERT, meta=()}", @@ -1103,7 +1103,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment { "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`CODENAME` TINYINT, position=AFTER, existedColumnName=VERSION}]}", "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`FIRST` VARCHAR(17), position=BEFORE, existedColumnName=ID}]}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3008 <- id, First, 3008, 8, 8, 80, 17, Jazz], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}", + "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}, comments={}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}", "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}", "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3009 <- id, 1st, 3009, 9, 9.0, 90, 18, Keka], op=INSERT, meta=()}",