From f7f3901fa38a0e1074609545703d19e99601aa5a Mon Sep 17 00:00:00 2001 From: MOBIN <18814118038@163.com> Date: Mon, 13 Jan 2025 15:23:58 +0800 Subject: [PATCH] [FLINK-37012][transform] Fix argument type mismatch when metadata column used in function This closes #3837 --- .../transform/ProjectionColumnProcessor.java | 2 -- .../transform/PostTransformOperatorTest.java | 25 ++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 292ba416a..83de37926 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -186,9 +186,7 @@ public class ProjectionColumnProcessor { break; } } - } - for (String originalColumnName : originalColumnNames) { METADATA_COLUMNS.stream() .filter(col -> col.f0.equals(originalColumnName)) .findFirst() diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index a7dc61ac4..2de893a09 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -95,6 +95,10 @@ public class PostTransformOperatorTest { .physicalColumn("name", DataTypes.STRING()) .physicalColumn("name_upper", DataTypes.STRING()) .physicalColumn("tbname", DataTypes.STRING().notNull()) + .physicalColumn("tbname_sid", DataTypes.STRING()) + .physicalColumn("sid_tbname", DataTypes.STRING()) + .physicalColumn("tbname_name", DataTypes.STRING()) + .physicalColumn("name_tbname", DataTypes.STRING()) .primaryKey("sid") .build(); @@ -543,7 +547,9 @@ public class PostTransformOperatorTest { PostTransformOperator.newBuilder() .addTransform( METADATA_AS_TABLEID.identifier(), - "sid, name, UPPER(name) as name_upper, __table_name__ as tbname", + "sid, name, UPPER(name) as name_upper, __table_name__ as tbname, " + + "concat(__table_name__,'_',sid) as tbname_sid, concat(sid,'_',__table_name__) as sid_tbname," + + "concat(__table_name__,'_',name) as tbname_name, concat(name,'_',__table_name__) as name_tbname", "sid < 3") .build(); RegularEventOperatorTestHarness @@ -561,7 +567,16 @@ public class PostTransformOperatorTest { DataChangeEvent.insertEvent( METADATA_AS_TABLEID, recordDataGenerator.generate( - new Object[] {1, new BinaryStringData("abc"), null, null})); + new Object[] { + 1, + new BinaryStringData("abc"), + null, + null, + null, + null, + null, + null + })); DataChangeEvent insertEventExpect = DataChangeEvent.insertEvent( METADATA_AS_TABLEID, @@ -570,7 +585,11 @@ public class PostTransformOperatorTest { 1, new BinaryStringData("abc"), new BinaryStringData("ABC"), - new BinaryStringData("metadata_as_table") + new BinaryStringData("metadata_as_table"), + new BinaryStringData("metadata_as_table_1"), + new BinaryStringData("1_metadata_as_table"), + new BinaryStringData("metadata_as_table_abc"), + new BinaryStringData("abc_metadata_as_table") })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat(