[FLINK-37011][cdc-transform] Improve get source field value by column name in PreTransformProcessor

This closes #3836
pull/3723/head
moses 2 weeks ago committed by GitHub
parent 0f675061e1
commit 9992584365
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -164,7 +164,7 @@ public class Schema implements Serializable {
return DataTypes.ROW(fields).notNull();
}
/** Returns a copy of the schema with a replaced list of {@Column}. */
/** Returns a copy of the schema with a replaced list of {@link Column}. */
public Schema copy(List<Column> columns) {
return new Schema(
columns,

@ -34,18 +34,20 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* PreTransformChangeInfo caches source / pre-transformed schema, source schema field getters, and
* binary record data generator for pre-transform schema.
*/
public class PreTransformChangeInfo {
private TableId tableId;
private Schema sourceSchema;
private Schema preTransformedSchema;
private RecordData.FieldGetter[] sourceFieldGetters;
private BinaryRecordDataGenerator preTransformedRecordDataGenerator;
private final TableId tableId;
private final Schema sourceSchema;
private final Schema preTransformedSchema;
private final Map<String, RecordData.FieldGetter> sourceFieldGettersMap;
private final BinaryRecordDataGenerator preTransformedRecordDataGenerator;
public static final PreTransformChangeInfo.Serializer SERIALIZER =
new PreTransformChangeInfo.Serializer();
@ -54,12 +56,16 @@ public class PreTransformChangeInfo {
TableId tableId,
Schema sourceSchema,
Schema preTransformedSchema,
RecordData.FieldGetter[] sourceFieldGetters,
RecordData.FieldGetter[] sourceFieldGettersMap,
BinaryRecordDataGenerator preTransformedRecordDataGenerator) {
this.tableId = tableId;
this.sourceSchema = sourceSchema;
this.preTransformedSchema = preTransformedSchema;
this.sourceFieldGetters = sourceFieldGetters;
this.sourceFieldGettersMap = new HashMap<>(sourceSchema.getColumnCount());
for (int i = 0; i < sourceSchema.getColumns().size(); i++) {
this.sourceFieldGettersMap.put(
sourceSchema.getColumns().get(i).getName(), sourceFieldGettersMap[i]);
}
this.preTransformedRecordDataGenerator = preTransformedRecordDataGenerator;
}
@ -87,8 +93,8 @@ public class PreTransformChangeInfo {
return preTransformedSchema;
}
public RecordData.FieldGetter[] getSourceFieldGetters() {
return sourceFieldGetters;
public Map<String, RecordData.FieldGetter> getSourceFieldGettersMap() {
return sourceFieldGettersMap;
}
public BinaryRecordDataGenerator getPreTransformedRecordDataGenerator() {

@ -22,10 +22,10 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* The processor of pre-transform projection in {@link PreTransformOperator}.
@ -39,7 +39,7 @@ import java.util.List;
* </ul>
*/
public class PreTransformProcessor {
private PreTransformChangeInfo tableChangeInfo;
private final PreTransformChangeInfo tableChangeInfo;
public PreTransformProcessor(PreTransformChangeInfo tableChangeInfo) {
this.tableChangeInfo = tableChangeInfo;
@ -62,30 +62,14 @@ public class PreTransformProcessor {
public BinaryRecordData processFillDataField(BinaryRecordData data) {
List<Object> valueList = new ArrayList<>();
List<Column> columns = tableChangeInfo.getPreTransformedSchema().getColumns();
for (int i = 0; i < columns.size(); i++) {
valueList.add(
getValueFromBinaryRecordData(
columns.get(i).getName(),
data,
tableChangeInfo.getSourceSchema().getColumns(),
tableChangeInfo.getSourceFieldGetters()));
Map<String, RecordData.FieldGetter> sourceFieldGettersMap =
tableChangeInfo.getSourceFieldGettersMap();
for (Column column : columns) {
RecordData.FieldGetter fieldGetter = sourceFieldGettersMap.get(column.getName());
valueList.add(fieldGetter.getFieldOrNull(data));
}
return tableChangeInfo
.getPreTransformedRecordDataGenerator()
.generate(valueList.toArray(new Object[0]));
}
private Object getValueFromBinaryRecordData(
String columnName,
BinaryRecordData binaryRecordData,
List<Column> columns,
RecordData.FieldGetter[] fieldGetters) {
for (int i = 0; i < columns.size(); i++) {
if (columnName.equals(columns.get(i).getName())) {
return DataTypeConverter.convert(
fieldGetters[i].getFieldOrNull(binaryRecordData), columns.get(i).getType());
}
}
return null;
}
}

Loading…
Cancel
Save