From 999258436595759556daecddd9c158899b3c60e8 Mon Sep 17 00:00:00 2001 From: moses <72908278+ChaomingZhangCN@users.noreply.github.com> Date: Tue, 14 Jan 2025 19:19:06 +0800 Subject: [PATCH] [FLINK-37011][cdc-transform] Improve get source field value by column name in PreTransformProcessor This closes #3836 --- .../flink/cdc/common/schema/Schema.java | 2 +- .../transform/PreTransformChangeInfo.java | 24 +++++++++------ .../transform/PreTransformProcessor.java | 30 +++++-------------- 3 files changed, 23 insertions(+), 33 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java index 01ca8906b..e452ba547 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java @@ -164,7 +164,7 @@ public class Schema implements Serializable { return DataTypes.ROW(fields).notNull(); } - /** Returns a copy of the schema with a replaced list of {@Column}. */ + /** Returns a copy of the schema with a replaced list of {@link Column}. */ public Schema copy(List columns) { return new Schema( columns, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java index 850eb4cc4..f1bdfde08 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java @@ -34,18 +34,20 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * PreTransformChangeInfo caches source / pre-transformed schema, source schema field getters, and * binary record data generator for pre-transform schema. */ public class PreTransformChangeInfo { - private TableId tableId; - private Schema sourceSchema; - private Schema preTransformedSchema; - private RecordData.FieldGetter[] sourceFieldGetters; - private BinaryRecordDataGenerator preTransformedRecordDataGenerator; + private final TableId tableId; + private final Schema sourceSchema; + private final Schema preTransformedSchema; + private final Map sourceFieldGettersMap; + private final BinaryRecordDataGenerator preTransformedRecordDataGenerator; public static final PreTransformChangeInfo.Serializer SERIALIZER = new PreTransformChangeInfo.Serializer(); @@ -54,12 +56,16 @@ public class PreTransformChangeInfo { TableId tableId, Schema sourceSchema, Schema preTransformedSchema, - RecordData.FieldGetter[] sourceFieldGetters, + RecordData.FieldGetter[] sourceFieldGettersMap, BinaryRecordDataGenerator preTransformedRecordDataGenerator) { this.tableId = tableId; this.sourceSchema = sourceSchema; this.preTransformedSchema = preTransformedSchema; - this.sourceFieldGetters = sourceFieldGetters; + this.sourceFieldGettersMap = new HashMap<>(sourceSchema.getColumnCount()); + for (int i = 0; i < sourceSchema.getColumns().size(); i++) { + this.sourceFieldGettersMap.put( + sourceSchema.getColumns().get(i).getName(), sourceFieldGettersMap[i]); + } this.preTransformedRecordDataGenerator = preTransformedRecordDataGenerator; } @@ -87,8 +93,8 @@ public class PreTransformChangeInfo { return preTransformedSchema; } - public RecordData.FieldGetter[] getSourceFieldGetters() { - return sourceFieldGetters; + public Map getSourceFieldGettersMap() { + return sourceFieldGettersMap; } public BinaryRecordDataGenerator getPreTransformedRecordDataGenerator() { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java index d4f6fec7d..cd679fcd5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java @@ -22,10 +22,10 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * The processor of pre-transform projection in {@link PreTransformOperator}. @@ -39,7 +39,7 @@ import java.util.List; * */ public class PreTransformProcessor { - private PreTransformChangeInfo tableChangeInfo; + private final PreTransformChangeInfo tableChangeInfo; public PreTransformProcessor(PreTransformChangeInfo tableChangeInfo) { this.tableChangeInfo = tableChangeInfo; @@ -62,30 +62,14 @@ public class PreTransformProcessor { public BinaryRecordData processFillDataField(BinaryRecordData data) { List valueList = new ArrayList<>(); List columns = tableChangeInfo.getPreTransformedSchema().getColumns(); - for (int i = 0; i < columns.size(); i++) { - valueList.add( - getValueFromBinaryRecordData( - columns.get(i).getName(), - data, - tableChangeInfo.getSourceSchema().getColumns(), - tableChangeInfo.getSourceFieldGetters())); + Map sourceFieldGettersMap = + tableChangeInfo.getSourceFieldGettersMap(); + for (Column column : columns) { + RecordData.FieldGetter fieldGetter = sourceFieldGettersMap.get(column.getName()); + valueList.add(fieldGetter.getFieldOrNull(data)); } return tableChangeInfo .getPreTransformedRecordDataGenerator() .generate(valueList.toArray(new Object[0])); } - - private Object getValueFromBinaryRecordData( - String columnName, - BinaryRecordData binaryRecordData, - List columns, - RecordData.FieldGetter[] fieldGetters) { - for (int i = 0; i < columns.size(); i++) { - if (columnName.equals(columns.get(i).getName())) { - return DataTypeConverter.convert( - fieldGetters[i].getFieldOrNull(binaryRecordData), columns.get(i).getType()); - } - } - return null; - } }