[FLINK-35256][cdc][runtime] Fix transform node does not respect type nullability (#3272)

pull/3269/head
yux 10 months ago committed by GitHub
parent a513e9f82e
commit 6258bec5bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -19,6 +19,7 @@ package org.apache.flink.cdc.runtime.parser;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
@ -154,6 +155,12 @@ public class TransformParser {
.collect(
Collectors.toMap(
RelDataTypeField::getName, RelDataTypeField::getType));
Map<String, Boolean> isNotNullMap =
columns.stream()
.collect(
Collectors.toMap(
Column::getName, column -> !column.getType().isNullable()));
List<ProjectionColumn> projectionColumns = new ArrayList<>();
for (SqlNode sqlNode : sqlSelect.getSelectList()) {
if (sqlNode instanceof SqlBasicCall) {
@ -205,21 +212,27 @@ public class TransformParser {
} else if (sqlNode instanceof SqlIdentifier) {
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
DataType columnType =
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataTypeMap.get(columnName));
if (isMetadataColumn(columnName)) {
projectionColumns.add(
ProjectionColumn.of(
columnName,
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataTypeMap.get(columnName)),
// Metadata columns should never be null
columnType.notNull(),
columnName,
columnName,
Arrays.asList(columnName)));
} else {
// Calcite translated column type doesn't keep nullability.
// Appending it manually to circumvent this problem.
projectionColumns.add(
ProjectionColumn.of(
columnName,
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataTypeMap.get(columnName))));
isNotNullMap.get(columnName)
? columnType.notNull()
: columnType.nullable()));
}
} else {
throw new ParseException("Unrecognized projection: " + sqlNode.toString());

@ -74,6 +74,26 @@ public class TransformSchemaOperatorTest {
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();
private static final Schema NULLABILITY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.STRING().notNull())
.physicalColumn("name", DataTypes.STRING())
.primaryKey("id")
.partitionKey("id")
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();
private static final Schema EXPECTED_NULLABILITY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.STRING().notNull())
.physicalColumn("uid", DataTypes.STRING())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("uname", DataTypes.STRING())
.primaryKey("id")
.partitionKey("id")
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();
@Test
void testEventTransform() throws Exception {
TransformSchemaOperator transform =
@ -176,4 +196,33 @@ public class TransformSchemaOperatorTest {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(updateEventExpect));
}
@Test
public void testNullabilityColumn() throws Exception {
TransformSchemaOperator transform =
TransformSchemaOperator.newBuilder()
.addTransform(
CUSTOMERS_TABLEID.identifier(),
"id, upper(id) uid, name, upper(name) uname",
"id",
"id",
"key1=value1,key2=value2")
.build();
EventOperatorTestHarness<TransformSchemaOperator, Event>
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
transformFunctionEventEventOperatorTestHarness.open();
// Create table
CreateTableEvent createTableEvent =
new CreateTableEvent(CUSTOMERS_TABLEID, NULLABILITY_SCHEMA);
transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
new CreateTableEvent(
CUSTOMERS_TABLEID, EXPECTED_NULLABILITY_SCHEMA)));
}
}

Loading…
Cancel
Save