[FLINK-35981][cdc-runtime] Transform supports referencing one column more than once

This closes  #3515.
pull/3529/head
MOBIN 6 months ago committed by GitHub
parent b361db58ab
commit 84ef9d5daa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
/**
@ -98,7 +99,9 @@ public class ProjectionColumnProcessor {
// 1 - Add referenced columns
RecordData.FieldGetter[] fieldGetters = tableInfo.getPreTransformedFieldGetters();
for (String originalColumnName : projectionColumn.getOriginalColumnNames()) {
LinkedHashSet<String> originalColumnNames =
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
params.add(tableInfo.getNamespace());
@ -142,7 +145,8 @@ public class ProjectionColumnProcessor {
List<Class<?>> paramTypes = new ArrayList<>();
List<Column> columns = tableInfo.getPreTransformedSchema().getColumns();
String scriptExpression = projectionColumn.getScriptExpression();
List<String> originalColumnNames = projectionColumn.getOriginalColumnNames();
LinkedHashSet<String> originalColumnNames =
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
for (Column column : columns) {
if (column.getName().equals(originalColumnName)) {

@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.stream.Stream;
@ -91,14 +92,12 @@ public class TransformFilterProcessor {
List<Class<?>> argTypes = new ArrayList<>();
String scriptExpression = transformFilter.getScriptExpression();
List<Column> columns = tableInfo.getPreTransformedSchema().getColumns();
List<String> columnNames = transformFilter.getColumnNames();
LinkedHashSet<String> columnNames = new LinkedHashSet<>(transformFilter.getColumnNames());
for (String columnName : columnNames) {
for (Column column : columns) {
if (column.getName().equals(columnName)) {
if (!argNames.contains(columnName)) {
argNames.add(columnName);
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
}
argNames.add(columnName);
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
break;
}
}

@ -226,6 +226,16 @@ public class PostTransformOperatorTest {
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();
private static final TableId COLUMN_SQUARE_TABLE =
TableId.tableId("my_company", "my_branch", "column_square");
private static final Schema COLUMN_SQUARE_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.INT())
.physicalColumn("col2", DataTypes.INT())
.physicalColumn("square_col2", DataTypes.INT())
.primaryKey("col1")
.build();
@Test
void testDataChangeEventTransform() throws Exception {
PostTransformOperator transform =
@ -560,6 +570,67 @@ public class PostTransformOperatorTest {
.isEqualTo(new StreamRecord<>(insertEventExpect));
}
@Test
void testDataChangeEventTransformWithDuplicateColumns() throws Exception {
PostTransformOperator transform =
PostTransformOperator.newBuilder()
.addTransform(
COLUMN_SQUARE_TABLE.identifier(),
"col1, col2, col2 * col2 as square_col2",
"col2 < 3 OR col2 > 5")
.build();
EventOperatorTestHarness<PostTransformOperator, Event>
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
transformFunctionEventEventOperatorTestHarness.open();
// Create table
CreateTableEvent createTableEvent =
new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA);
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(((RowType) COLUMN_SQUARE_SCHEMA.toRowDataType()));
// Insert
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE,
recordDataGenerator.generate(new Object[] {1, 1, null}));
DataChangeEvent insertEventExpect =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new Object[] {1, 1, 1}));
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE,
recordDataGenerator.generate(new Object[] {6, 6, null}));
DataChangeEvent insertEventExpect2 =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new Object[] {6, 6, 36}));
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE,
recordDataGenerator.generate(new Object[] {4, 4, null}));
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA)));
transform.processElement(new StreamRecord<>(insertEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect));
transform.processElement(new StreamRecord<>(insertEvent2));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect2));
transform.processElement(new StreamRecord<>(insertEvent3));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isNull();
}
@Test
void testTimestampTransform() throws Exception {
PostTransformOperator transform =

Loading…
Cancel
Save