From 84ef9d5daa78412b3f2f4dc087e8ea2e4be93b7c Mon Sep 17 00:00:00 2001 From: MOBIN <18814118038@163.com> Date: Sat, 10 Aug 2024 00:01:36 +0800 Subject: [PATCH] [FLINK-35981][cdc-runtime] Transform supports referencing one column more than once This closes #3515. --- .../transform/ProjectionColumnProcessor.java | 8 ++- .../transform/TransformFilterProcessor.java | 9 ++- .../transform/PostTransformOperatorTest.java | 71 +++++++++++++++++++ 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 3dde9b20c..a27af2370 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; /** @@ -98,7 +99,9 @@ public class ProjectionColumnProcessor { // 1 - Add referenced columns RecordData.FieldGetter[] fieldGetters = tableInfo.getPreTransformedFieldGetters(); - for (String originalColumnName : projectionColumn.getOriginalColumnNames()) { + LinkedHashSet originalColumnNames = + new LinkedHashSet<>(projectionColumn.getOriginalColumnNames()); + for (String originalColumnName : originalColumnNames) { switch (originalColumnName) { case TransformParser.DEFAULT_NAMESPACE_NAME: params.add(tableInfo.getNamespace()); @@ -142,7 +145,8 @@ public class ProjectionColumnProcessor { List> paramTypes = new ArrayList<>(); List columns = tableInfo.getPreTransformedSchema().getColumns(); String scriptExpression = projectionColumn.getScriptExpression(); - List originalColumnNames = projectionColumn.getOriginalColumnNames(); + LinkedHashSet originalColumnNames = + new LinkedHashSet<>(projectionColumn.getOriginalColumnNames()); for (String originalColumnName : originalColumnNames) { for (Column column : columns) { if (column.getName().equals(originalColumnName)) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 84d483035..d1f67818b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.stream.Stream; @@ -91,14 +92,12 @@ public class TransformFilterProcessor { List> argTypes = new ArrayList<>(); String scriptExpression = transformFilter.getScriptExpression(); List columns = tableInfo.getPreTransformedSchema().getColumns(); - List columnNames = transformFilter.getColumnNames(); + LinkedHashSet columnNames = new LinkedHashSet<>(transformFilter.getColumnNames()); for (String columnName : columnNames) { for (Column column : columns) { if (column.getName().equals(columnName)) { - if (!argNames.contains(columnName)) { - argNames.add(columnName); - argTypes.add(DataTypeConverter.convertOriginalClass(column.getType())); - } + argNames.add(columnName); + argTypes.add(DataTypeConverter.convertOriginalClass(column.getType())); break; } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 34b710374..067842c31 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -226,6 +226,16 @@ public class PostTransformOperatorTest { .options(ImmutableMap.of("key1", "value1", "key2", "value2")) .build(); + private static final TableId COLUMN_SQUARE_TABLE = + TableId.tableId("my_company", "my_branch", "column_square"); + private static final Schema COLUMN_SQUARE_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.INT()) + .physicalColumn("col2", DataTypes.INT()) + .physicalColumn("square_col2", DataTypes.INT()) + .primaryKey("col1") + .build(); + @Test void testDataChangeEventTransform() throws Exception { PostTransformOperator transform = @@ -560,6 +570,67 @@ public class PostTransformOperatorTest { .isEqualTo(new StreamRecord<>(insertEventExpect)); } + @Test + void testDataChangeEventTransformWithDuplicateColumns() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + COLUMN_SQUARE_TABLE.identifier(), + "col1, col2, col2 * col2 as square_col2", + "col2 < 3 OR col2 > 5") + .build(); + EventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) COLUMN_SQUARE_SCHEMA.toRowDataType())); + // Insert + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {1, 1, null})); + DataChangeEvent insertEventExpect = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new Object[] {1, 1, 1})); + + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {6, 6, null})); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new Object[] {6, 6, 36})); + + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {4, 4, null})); + + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA))); + transform.processElement(new StreamRecord<>(insertEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect)); + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + transform.processElement(new StreamRecord<>(insertEvent3)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isNull(); + } + @Test void testTimestampTransform() throws Exception { PostTransformOperator transform =