diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index 7235f2b67..3e7634147 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -44,15 +44,13 @@ public class TransformTranslator { PreTransformOperator.Builder preTransformFunctionBuilder = PreTransformOperator.newBuilder(); for (TransformDef transform : transforms) { - if (transform.isValidProjection()) { - preTransformFunctionBuilder.addTransform( - transform.getSourceTable(), - transform.getProjection().orElse(null), - transform.getFilter().orElse(null), - transform.getPrimaryKeys(), - transform.getPartitionKeys(), - transform.getTableOptions()); - } + preTransformFunctionBuilder.addTransform( + transform.getSourceTable(), + transform.getProjection().orElse(null), + transform.getFilter().orElse(null), + transform.getPrimaryKeys(), + transform.getPartitionKeys(), + transform.getTableOptions()); } preTransformFunctionBuilder.addUdfFunctions( udfFunctions.stream() diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index ebec5e160..e16a2745b 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -287,7 +287,6 @@ class FlinkPipelineTransformITCase { */ @ParameterizedTest @EnumSource - @Disabled("This doesn't work until FLINK-35982 got fixed.") void testMetadataInfoWithoutChangingSchema(ValuesDataSink.SinkApi sinkApi) throws Exception { runGenericTransformTest( sinkApi, @@ -299,13 +298,13 @@ class FlinkPipelineTransformITCase { "id,name", "id", "replication_num=1,bucket=17", - "Just a Transform Block")), + "A Transform Block without projection or filter")), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}")); @@ -506,7 +505,7 @@ class FlinkPipelineTransformITCase { + "LOWER(name) AS col4, " + "TRIM(name) AS col5, " + "REGEXP_REPLACE(name, 'Al|Bo', '**') AS col6, " - + "SUBSTR(name, 0, 1) AS col7, " + + "SUBSTR(name, 1, 1) AS col7, " + "SUBSTR(name, 2, 1) AS col8, " + "SUBSTR(name, 3) AS col9, " + "CONCAT(name, ' - ', CAST(id AS VARCHAR)) AS col10", @@ -517,13 +516,13 @@ class FlinkPipelineTransformITCase { null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, Dear Alice, 5, ALICE, alice, Alice, **ice, A, i, ce, Alice - 1], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], after=[2, Bob, 30, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, Dear Alice, 5, ALICE, alice, Alice, **ice, A, l, ice, Alice - 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], after=[2, Bob, 30, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], op=UPDATE, meta=()}", "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Dear Carol, 5, CAROL, carol, Carol, Carol, C, r, ol, Carol - 3], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, r, rida, Derrida - 4], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, r, rida, Derrida - 4], after=[], op=DELETE, meta=()}")); + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Dear Carol, 5, CAROL, carol, Carol, Carol, C, a, rol, Carol - 3], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, e, rrida, Derrida - 4], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, e, rrida, Derrida - 4], after=[], op=DELETE, meta=()}")); } @ParameterizedTest