[FLINK-37012][transform] Fix argument type mismatch when metadata column used in function

This closes #3837
pull/3784/head^2
MOBIN 3 weeks ago committed by GitHub
parent 085684b773
commit f7f3901fa3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -186,9 +186,7 @@ public class ProjectionColumnProcessor {
break;
}
}
}
for (String originalColumnName : originalColumnNames) {
METADATA_COLUMNS.stream()
.filter(col -> col.f0.equals(originalColumnName))
.findFirst()

@ -95,6 +95,10 @@ public class PostTransformOperatorTest {
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("name_upper", DataTypes.STRING())
.physicalColumn("tbname", DataTypes.STRING().notNull())
.physicalColumn("tbname_sid", DataTypes.STRING())
.physicalColumn("sid_tbname", DataTypes.STRING())
.physicalColumn("tbname_name", DataTypes.STRING())
.physicalColumn("name_tbname", DataTypes.STRING())
.primaryKey("sid")
.build();
@ -543,7 +547,9 @@ public class PostTransformOperatorTest {
PostTransformOperator.newBuilder()
.addTransform(
METADATA_AS_TABLEID.identifier(),
"sid, name, UPPER(name) as name_upper, __table_name__ as tbname",
"sid, name, UPPER(name) as name_upper, __table_name__ as tbname, "
+ "concat(__table_name__,'_',sid) as tbname_sid, concat(sid,'_',__table_name__) as sid_tbname,"
+ "concat(__table_name__,'_',name) as tbname_name, concat(name,'_',__table_name__) as name_tbname",
"sid < 3")
.build();
RegularEventOperatorTestHarness<PostTransformOperator, Event>
@ -561,7 +567,16 @@ public class PostTransformOperatorTest {
DataChangeEvent.insertEvent(
METADATA_AS_TABLEID,
recordDataGenerator.generate(
new Object[] {1, new BinaryStringData("abc"), null, null}));
new Object[] {
1,
new BinaryStringData("abc"),
null,
null,
null,
null,
null,
null
}));
DataChangeEvent insertEventExpect =
DataChangeEvent.insertEvent(
METADATA_AS_TABLEID,
@ -570,7 +585,11 @@ public class PostTransformOperatorTest {
1,
new BinaryStringData("abc"),
new BinaryStringData("ABC"),
new BinaryStringData("metadata_as_table")
new BinaryStringData("metadata_as_table"),
new BinaryStringData("metadata_as_table_1"),
new BinaryStringData("1_metadata_as_table"),
new BinaryStringData("metadata_as_table_abc"),
new BinaryStringData("abc_metadata_as_table")
}));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(

Loading…
Cancel
Save