validate column name and comment

pull/3865/head
MOBIN-F 3 weeks ago
parent 4ef3316a33
commit 9a4e9678a2

@ -78,6 +78,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.cdc.common.utils.Preconditions.checkState; import static org.apache.flink.cdc.common.utils.Preconditions.checkState;
@ -178,14 +179,14 @@ public class SchemaMergingUtils {
} else { } else {
Schema outputSchema = null; Schema outputSchema = null;
for (Schema schema : schemas) { for (Schema schema : schemas) {
validateTransformColumnCounts(outputSchema, schema); validateTransformColumn(outputSchema, schema);
outputSchema = getLeastCommonSchema(outputSchema, schema); outputSchema = getLeastCommonSchema(outputSchema, schema);
} }
return outputSchema; return outputSchema;
} }
} }
public static void validateTransformColumnCounts( public static void validateTransformColumn(
@Nullable Schema currentSchema, Schema upcomingSchema) { @Nullable Schema currentSchema, Schema upcomingSchema) {
if (currentSchema != null) { if (currentSchema != null) {
checkState( checkState(
@ -193,6 +194,28 @@ public class SchemaMergingUtils {
String.format( String.format(
"Unable to merge schema %s and %s with different column counts.", "Unable to merge schema %s and %s with different column counts.",
currentSchema, upcomingSchema)); currentSchema, upcomingSchema));
List<Column> currentColumns = currentSchema.getColumns();
List<Column> upcomingColumns = upcomingSchema.getColumns();
IntStream.range(0, currentColumns.size())
.forEach(
i -> {
Column currentColumn = currentColumns.get(i);
Column upcomingColumn = upcomingColumns.get(i);
checkState(
Objects.equals(
currentColumn.getName(), upcomingColumn.getName()),
String.format(
"Unable to merge column %s and %s with different name.",
currentColumn, upcomingColumn));
checkState(
Objects.equals(
currentColumn.getComment(),
upcomingColumn.getComment()),
String.format(
"Unable to merge column %s and %s with different comments.",
currentColumn, upcomingColumn));
});
} }
} }

@ -67,7 +67,7 @@ import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getLeastCommo
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getSchemaDifference; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getSchemaDifference;
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isDataTypeCompatible; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isDataTypeCompatible;
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isSchemaCompatible; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isSchemaCompatible;
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.validateTransformColumnCounts; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.validateTransformColumn;
/** A test for the {@link SchemaMergingUtils}. */ /** A test for the {@link SchemaMergingUtils}. */
class SchemaMergingUtilsTest { class SchemaMergingUtilsTest {
@ -1118,15 +1118,24 @@ class SchemaMergingUtilsTest {
} }
@Test @Test
void testTransformColumnCounts() { void testTransformColumn() {
Assertions.assertThatCode( Assertions.assertThatCode(
() -> () ->
validateTransformColumnCounts( validateTransformColumn(
of("id", BIGINT, "name", VARCHAR(17)), of("id", BIGINT))) of("id", BIGINT, "name", VARCHAR(17)), of("id", BIGINT)))
.as("test different column counts compatibility") .as("test different column counts compatibility")
.hasMessage( .hasMessage(
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(17)}, primaryKeys=, options=() " "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(17)}, primaryKeys=, options=() "
+ "and columns={`id` BIGINT}, primaryKeys=, options=() with different column counts."); + "and columns={`id` BIGINT}, primaryKeys=, options=() with different column counts.");
Assertions.assertThatCode(
() ->
validateTransformColumn(
of("id", BIGINT, "name", VARCHAR(17)),
of("id", BIGINT, "age", INT)))
.as("test different column name compatibility")
.hasMessage(
"Unable to merge column `name` VARCHAR(17) and `age` INT with different name.");
} }
private static void assertTypeMergingVector(DataType incomingType, List<DataType> resultTypes) { private static void assertTypeMergingVector(DataType incomingType, List<DataType> resultTypes) {

@ -406,6 +406,40 @@ class FlinkPipelineTransformITCase {
+ "and columns={`id` BIGINT NOT NULL,`name` STRING}, primaryKeys=id, options=() with different column counts."); + "and columns={`id` BIGINT NOT NULL,`name` STRING}, primaryKeys=id, options=() with different column counts.");
} }
@ParameterizedTest
@EnumSource
void testMultiTransformColumNameCompatibility(ValuesDataSink.SinkApi sinkApi) {
assertThatThrownBy(
() ->
runGenericTransformTest(
sinkApi,
Arrays.asList(
new TransformDef(
"default_namespace.default_schema.mytable2",
"id,age",
"age < 18",
null,
null,
null,
null,
null),
new TransformDef(
"default_namespace.default_schema.mytable2",
// reference part column
"id,UPPER(name) AS name",
"age >= 18",
null,
null,
null,
null,
null)),
Collections.emptyList()))
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.hasMessage(
"Unable to merge column `age` TINYINT and `name` STRING with different name.");
}
@ParameterizedTest @ParameterizedTest
@EnumSource @EnumSource
void testMultiTransformMetaSchemaCompatibility(ValuesDataSink.SinkApi sinkApi) { void testMultiTransformMetaSchemaCompatibility(ValuesDataSink.SinkApi sinkApi) {

Loading…
Cancel
Save