diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java index dd93d5ca3..85ddaf410 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java @@ -73,11 +73,14 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import static org.apache.flink.cdc.common.utils.Preconditions.checkState; + /** * Utils for merging {@link Schema}s and {@link DataType}s. Prefer using this over {@link * SchemaUtils} to get consistent schema merging behaviors. @@ -169,20 +172,30 @@ public class SchemaMergingUtils { } /** Merge compatible schemas. */ - public static Schema getCommonSchema(List schemas) { - if (schemas.isEmpty()) { - return null; - } else if (schemas.size() == 1) { - return schemas.get(0); + public static Schema getCommonSchema(LinkedHashSet schemas) { + if (schemas.size() == 1) { + return schemas.iterator().next(); } else { Schema outputSchema = null; for (Schema schema : schemas) { + validateTransformColumnCounts(outputSchema, schema); outputSchema = getLeastCommonSchema(outputSchema, schema); } return outputSchema; } } + public static void validateTransformColumnCounts( + @Nullable Schema currentSchema, Schema upcomingSchema) { + if (currentSchema != null) { + checkState( + currentSchema.getColumnCount() == upcomingSchema.getColumnCount(), + String.format( + "Unable to merge schema %s and %s with different column counts.", + currentSchema, upcomingSchema)); + } + } + /** * Generating what schema change events we need to do by converting compatible {@code * beforeSchema} to {@code afterSchema}. diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 966f0b6d5..846828a79 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -41,6 +41,7 @@ import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -49,6 +50,8 @@ import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.cdc.common.utils.Preconditions.checkState; + /** Utils for {@link Schema} to perform the ability of evolution. */ @PublicEvolving public class SchemaUtils { @@ -452,6 +455,43 @@ public class SchemaUtils { return lSchema.copy(mergedColumns); } + public static void validateMetaSchemaCompatibility(LinkedHashSet schemas) { + if (schemas.size() > 1) { + Schema outputSchema = null; + for (Schema schema : schemas) { + isMetaSchemaCompatible(outputSchema, schema); + outputSchema = schema; + } + } + } + + public static void isMetaSchemaCompatible( + @Nullable Schema currentSchema, Schema upcomingSchema) { + if (currentSchema == null) { + return; + } + checkState( + currentSchema.primaryKeys().equals(upcomingSchema.primaryKeys()), + String.format( + "Unable to merge schema %s and %s with different primary keys.", + currentSchema, upcomingSchema)); + checkState( + currentSchema.partitionKeys().equals(upcomingSchema.partitionKeys()), + String.format( + "Unable to merge schema %s and %s with different partition keys.", + currentSchema, upcomingSchema)); + checkState( + currentSchema.options().equals(upcomingSchema.options()), + String.format( + "Unable to merge schema %s and %s with different options.", + currentSchema, upcomingSchema)); + checkState( + Objects.equals(currentSchema.comment(), upcomingSchema.comment()), + String.format( + "Unable to merge schema %s and %s with different comments.", + currentSchema, upcomingSchema)); + } + /** * Try to combine two columns with potential incompatible type. * diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java index c5021d593..89030a992 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java @@ -67,6 +67,7 @@ import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getLeastCommo import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getSchemaDifference; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isDataTypeCompatible; import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isSchemaCompatible; +import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.validateTransformColumnCounts; /** A test for the {@link SchemaMergingUtils}. */ class SchemaMergingUtilsTest { @@ -1116,6 +1117,18 @@ class SchemaMergingUtilsTest { MAP)); } + @Test + void testTransformColumnCounts() { + Assertions.assertThatCode( + () -> + validateTransformColumnCounts( + of("id", BIGINT, "name", VARCHAR(17)), of("id", BIGINT))) + .as("test different column counts compatibility") + .hasMessage( + "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(17)}, primaryKeys=, options=() " + + "and columns={`id` BIGINT}, primaryKeys=, options=() with different column counts."); + } + private static void assertTypeMergingVector(DataType incomingType, List resultTypes) { Assertions.assertThat(ALL_TYPES) .map(type -> getLeastCommonType(type, incomingType)) diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index 6ff686aec..f5d3454b6 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -33,9 +33,12 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import static org.apache.flink.cdc.common.utils.SchemaUtils.validateMetaSchemaCompatibility; + /** A test for the {@link org.apache.flink.cdc.common.utils.SchemaUtils}. */ public class SchemaUtilsTest { @@ -484,4 +487,68 @@ public class SchemaUtilsTest { .build())) .isExactlyInstanceOf(IllegalStateException.class); } + + @Test + void testMetaSchemaCompatibility() { + Assertions.assertThatCode( + () -> + validateMetaSchemaCompatibility( + of( + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .build(), + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .build()))) + .as("test same schema") + .doesNotThrowAnyException(); + + Assertions.assertThatThrownBy( + () -> + validateMetaSchemaCompatibility( + of( + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .primaryKey("col1") + .build(), + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .primaryKey("col2") + .build()))) + .isExactlyInstanceOf(IllegalStateException.class) + .as("test primary key conflict") + .hasMessage( + "Unable to merge schema columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=() " + + "and columns={`col1` STRING,`col2` STRING}, primaryKeys=col2, options=() with different primary keys."); + + Assertions.assertThatThrownBy( + () -> + validateMetaSchemaCompatibility( + of( + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .partitionKey("col1") + .build(), + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .partitionKey("col2") + .build()))) + .isExactlyInstanceOf(IllegalStateException.class) + .as("test partition key conflict") + .hasMessage( + "Unable to merge schema columns={`col1` STRING,`col2` STRING}, primaryKeys=, partitionKeys=col1, options=() " + + "and columns={`col1` STRING,`col2` STRING}, primaryKeys=, partitionKeys=col2, options=() with different partition keys."); + } + + private static LinkedHashSet of(Schema... args) { + LinkedHashSet schemas = new LinkedHashSet<>(args.length); + schemas.addAll(Arrays.asList(args)); + return schemas; + } } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 55a205d49..ca65df8d7 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -457,9 +457,9 @@ class FlinkPipelineComposerITCase { "default_namespace.default_schema.table1", "*,concat(col1,'2') as col12", "col1 = '2'", - null, - null, - null, + "col1", + "col12", + "key1=value1", "", null); // Setup pipeline diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java index ad541f447..bc8a8bafb 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java @@ -499,9 +499,9 @@ class FlinkPipelineComposerLenientITCase { "default_namespace.default_schema.table1", "*,concat(col1,'2') as col12", "col1 = '2'", - null, - null, - null, + "col1", + "col12", + "key1=value1", "", null); // Setup pipeline diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 2ac394add..0a0180407 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -373,26 +373,21 @@ class FlinkPipelineTransformITCase { @ParameterizedTest @EnumSource - @Disabled("to be fixed in FLINK-37132") - void testMultiTransformSchemaColumnsCompatibilityWithNullProjection( - ValuesDataSink.SinkApi sinkApi) { - TransformDef nullProjection = - new TransformDef( - "default_namespace.default_schema.mytable2", - null, - "age < 18", - null, - null, - null, - null, - null); - + void testMultiTransformColumnCountsCompatibility(ValuesDataSink.SinkApi sinkApi) { assertThatThrownBy( () -> runGenericTransformTest( sinkApi, Arrays.asList( - nullProjection, + new TransformDef( + "default_namespace.default_schema.mytable2", + null, + "age < 18", + null, + null, + null, + null, + null), new TransformDef( "default_namespace.default_schema.mytable2", // reference part column @@ -407,39 +402,33 @@ class FlinkPipelineTransformITCase { .rootCause() .isExactlyInstanceOf(IllegalStateException.class) .hasMessage( - "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() " - + "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts."); + "Unable to merge schema columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() " + + "and columns={`id` BIGINT NOT NULL,`name` STRING}, primaryKeys=id, options=() with different column counts."); } @ParameterizedTest @EnumSource - @Disabled("to be fixed in FLINK-37132") - void testMultiTransformSchemaColumnsCompatibilityWithEmptyProjection( - ValuesDataSink.SinkApi sinkApi) { - TransformDef emptyProjection = - new TransformDef( - "default_namespace.default_schema.mytable2", - "", - "age < 18", - null, - null, - null, - null, - null); - + void testMultiTransformMetaSchemaCompatibility(ValuesDataSink.SinkApi sinkApi) { assertThatThrownBy( () -> runGenericTransformTest( sinkApi, Arrays.asList( - emptyProjection, new TransformDef( "default_namespace.default_schema.mytable2", - // reference part column + "id, name", + "age < 18", + null, + "age", + null, + null, + null), + new TransformDef( + "default_namespace.default_schema.mytable2", "id,UPPER(name) AS name", "age >= 18", null, - null, + "id", null, null, null)), @@ -447,8 +436,8 @@ class FlinkPipelineTransformITCase { .rootCause() .isExactlyInstanceOf(IllegalStateException.class) .hasMessage( - "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() " - + "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts."); + "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, partitionKeys=age, options=() " + + "and columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, partitionKeys=id, options=() with different partition keys."); } @ParameterizedTest diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index c7c69fa6c..c4f6bddaf 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -53,6 +53,7 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -351,7 +352,7 @@ public class PostTransformOperator extends AbstractStreamOperator } private Schema transformSchema(TableId tableId, Schema schema) { - List newSchemas = new ArrayList<>(); + LinkedHashSet newSchemas = new LinkedHashSet<>(); for (PostTransformer transform : transforms) { Selectors selectors = transform.getSelectors(); if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index 845fd4df1..82a1deb8f 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -52,6 +52,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -71,6 +72,7 @@ public class PreTransformOperator extends AbstractStreamOperator private transient List transforms; private final Map preTransformChangeInfoMap; private final List> schemaMetadataTransformers; + private final LinkedHashMap schemaMetadataTransformersLink; private transient ListState state; private final List>> udfFunctions; private List udfDescriptors; @@ -152,6 +154,7 @@ public class PreTransformOperator extends AbstractStreamOperator this.preTransformChangeInfoMap = new ConcurrentHashMap<>(); this.preTransformProcessorMap = new ConcurrentHashMap<>(); this.schemaMetadataTransformers = new ArrayList<>(); + this.schemaMetadataTransformersLink = new LinkedHashMap<>(); this.chainingStrategy = ChainingStrategy.ALWAYS; this.transformRules = transformRules; @@ -191,6 +194,9 @@ public class PreTransformOperator extends AbstractStreamOperator new Tuple2<>( selectors, new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions))); + schemaMetadataTransformersLink.put( + selectors, + new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions)); } this.preTransformProcessorMap = new ConcurrentHashMap<>(); this.hasAsteriskMap = new ConcurrentHashMap<>(); @@ -362,18 +368,16 @@ public class PreTransformOperator extends AbstractStreamOperator private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) { TableId tableId = createTableEvent.tableId(); - for (Tuple2 transform : schemaMetadataTransformers) { - Selectors selectors = transform.f0; - if (selectors.isMatch(tableId)) { - createTableEvent = - new CreateTableEvent( - tableId, - transformSchemaMetaData( - createTableEvent.getSchema(), transform.f1)); - } - } - - cachePreTransformProcessor(tableId, createTableEvent.getSchema()); + Schema originalSchema = createTableEvent.getSchema(); + LinkedHashSet newSchemas = + schemaMetadataTransformers.stream() + .filter(transform -> transform.f0.isMatch(tableId)) + .map(transform -> transformSchemaMetaData(originalSchema, transform.f1)) + .collect(Collectors.toCollection(LinkedHashSet::new)); + SchemaUtils.validateMetaSchemaCompatibility(newSchemas); + Schema commonSchema = newSchemas.isEmpty() ? originalSchema : newSchemas.iterator().next(); + createTableEvent = new CreateTableEvent(tableId, commonSchema); + cachePreTransformProcessor(tableId, commonSchema); if (preTransformProcessorMap.containsKey(tableId)) { return preTransformProcessorMap .get(tableId)