diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java index 85ddaf410..2cd503f00 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java @@ -78,6 +78,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.flink.cdc.common.utils.Preconditions.checkState; @@ -178,14 +179,14 @@ public class SchemaMergingUtils { } else { Schema outputSchema = null; for (Schema schema : schemas) { - validateTransformColumnCounts(outputSchema, schema); + validateTransformColumn(outputSchema, schema); outputSchema = getLeastCommonSchema(outputSchema, schema); } return outputSchema; } } - public static void validateTransformColumnCounts( + public static void validateTransformColumn( @Nullable Schema currentSchema, Schema upcomingSchema) { if (currentSchema != null) { checkState( @@ -193,6 +194,28 @@ public class SchemaMergingUtils { String.format( "Unable to merge schema %s and %s with different column counts.", currentSchema, upcomingSchema)); + + List currentColumns = currentSchema.getColumns(); + List upcomingColumns = upcomingSchema.getColumns(); + IntStream.range(0, currentColumns.size()) + .forEach( + i -> { + Column currentColumn = currentColumns.get(i); + Column upcomingColumn = upcomingColumns.get(i); + checkState( + Objects.equals( + currentColumn.getName(), upcomingColumn.getName()), + String.format( + "Unable to merge column %s and %s with different name.", + currentColumn, upcomingColumn)); + checkState( + Objects.equals( + currentColumn.getComment(), + upcomingColumn.getComment()), + String.format( + "Unable to merge column %s and %s with different comments.", + currentColumn, upcomingColumn)); + }); } } diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java index 89030a992..9eeb9cf08 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java @@ -67,7 +67,7 @@ import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getLeastCommo import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getSchemaDifference; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isDataTypeCompatible; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isSchemaCompatible; -import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.validateTransformColumnCounts; +import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.validateTransformColumn; /** A test for the {@link SchemaMergingUtils}. */ class SchemaMergingUtilsTest { @@ -1118,15 +1118,24 @@ class SchemaMergingUtilsTest { } @Test - void testTransformColumnCounts() { + void testTransformColumn() { Assertions.assertThatCode( () -> - validateTransformColumnCounts( + validateTransformColumn( of("id", BIGINT, "name", VARCHAR(17)), of("id", BIGINT))) .as("test different column counts compatibility") .hasMessage( "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(17)}, primaryKeys=, options=() " + "and columns={`id` BIGINT}, primaryKeys=, options=() with different column counts."); + + Assertions.assertThatCode( + () -> + validateTransformColumn( + of("id", BIGINT, "name", VARCHAR(17)), + of("id", BIGINT, "age", INT))) + .as("test different column name compatibility") + .hasMessage( + "Unable to merge column `name` VARCHAR(17) and `age` INT with different name."); } private static void assertTypeMergingVector(DataType incomingType, List resultTypes) { diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 0a0180407..50607bd3b 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -406,6 +406,40 @@ class FlinkPipelineTransformITCase { + "and columns={`id` BIGINT NOT NULL,`name` STRING}, primaryKeys=id, options=() with different column counts."); } + @ParameterizedTest + @EnumSource + void testMultiTransformColumNameCompatibility(ValuesDataSink.SinkApi sinkApi) { + assertThatThrownBy( + () -> + runGenericTransformTest( + sinkApi, + Arrays.asList( + new TransformDef( + "default_namespace.default_schema.mytable2", + "id,age", + "age < 18", + null, + null, + null, + null, + null), + new TransformDef( + "default_namespace.default_schema.mytable2", + // reference part column + "id,UPPER(name) AS name", + "age >= 18", + null, + null, + null, + null, + null)), + Collections.emptyList())) + .rootCause() + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage( + "Unable to merge column `age` TINYINT and `name` STRING with different name."); + } + @ParameterizedTest @EnumSource void testMultiTransformMetaSchemaCompatibility(ValuesDataSink.SinkApi sinkApi) {