diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java index 2cd503f00..c51e22eb4 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java @@ -71,16 +71,13 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.apache.flink.cdc.common.utils.Preconditions.checkState; /** * Utils for merging {@link Schema}s and {@link DataType}s. Prefer using this over {@link @@ -173,13 +170,17 @@ public class SchemaMergingUtils { } /** Merge compatible schemas. */ - public static Schema getCommonSchema(LinkedHashSet schemas) { + public static Schema getCommonSchema(Collection schemas) { if (schemas.size() == 1) { return schemas.iterator().next(); } else { Schema outputSchema = null; for (Schema schema : schemas) { - validateTransformColumn(outputSchema, schema); + try { + validateTransformColumn(outputSchema, schema); + } catch (SchemaUtils.SchemaValidationException e) { + throw new IllegalStateException("Schema validation failed.", e); + } outputSchema = getLeastCommonSchema(outputSchema, schema); } return outputSchema; @@ -187,35 +188,34 @@ public class SchemaMergingUtils { } public static void validateTransformColumn( - @Nullable Schema currentSchema, Schema upcomingSchema) { + @Nullable Schema currentSchema, Schema upcomingSchema) + throws SchemaUtils.SchemaValidationException { if (currentSchema != null) { - checkState( - currentSchema.getColumnCount() == upcomingSchema.getColumnCount(), - String.format( - "Unable to merge schema %s and %s with different column counts.", - currentSchema, upcomingSchema)); + if (currentSchema.getColumnCount() != upcomingSchema.getColumnCount()) { + throw new SchemaUtils.SchemaValidationException( + String.format( + "Unable to merge schema %s and %s with different column counts.", + currentSchema, upcomingSchema)); + } List currentColumns = currentSchema.getColumns(); List upcomingColumns = upcomingSchema.getColumns(); - IntStream.range(0, currentColumns.size()) - .forEach( - i -> { - Column currentColumn = currentColumns.get(i); - Column upcomingColumn = upcomingColumns.get(i); - checkState( - Objects.equals( - currentColumn.getName(), upcomingColumn.getName()), - String.format( - "Unable to merge column %s and %s with different name.", - currentColumn, upcomingColumn)); - checkState( - Objects.equals( - currentColumn.getComment(), - upcomingColumn.getComment()), - String.format( - "Unable to merge column %s and %s with different comments.", - currentColumn, upcomingColumn)); - }); + for (int i = 0; i < currentColumns.size(); i++) { + Column currentColumn = currentColumns.get(i); + Column upcomingColumn = upcomingColumns.get(i); + if (!Objects.equals(currentColumn.getName(), upcomingColumn.getName())) { + throw new SchemaUtils.SchemaValidationException( + String.format( + "Unable to merge column %s and %s with different name.", + currentColumn, upcomingColumn)); + } + if (!Objects.equals(currentColumn.getComment(), upcomingColumn.getComment())) { + throw new SchemaUtils.SchemaValidationException( + String.format( + "Unable to merge column %s and %s with different comments.", + currentColumn, upcomingColumn)); + } + } } } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 72f922ae7..0f48817b1 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -41,7 +41,7 @@ import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.LinkedHashSet; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -50,8 +50,6 @@ import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.cdc.common.utils.Preconditions.checkState; - /** Utils for {@link Schema} to perform the ability of evolution. */ @PublicEvolving public class SchemaUtils { @@ -455,41 +453,53 @@ public class SchemaUtils { return lSchema.copy(mergedColumns); } - public static void validateMetaSchemaCompatibility(LinkedHashSet schemas) { + public static void validateMetaSchemaCompatibility(Collection schemas) { if (schemas.size() > 1) { Schema outputSchema = null; for (Schema schema : schemas) { - validateMetaSchemaCompatible(outputSchema, schema); + try { + validateMetaSchemaCompatible(outputSchema, schema); + } catch (SchemaValidationException e) { + throw new IllegalStateException("Schema validation failed.", e); + } outputSchema = schema; } } } public static void validateMetaSchemaCompatible( - @Nullable Schema currentSchema, Schema upcomingSchema) { + @Nullable Schema currentSchema, Schema upcomingSchema) + throws SchemaValidationException { if (currentSchema == null) { return; } - checkState( - currentSchema.primaryKeys().equals(upcomingSchema.primaryKeys()), - String.format( - "Unable to merge schema %s and %s with different primary keys.", - currentSchema, upcomingSchema)); - checkState( - currentSchema.partitionKeys().equals(upcomingSchema.partitionKeys()), - String.format( - "Unable to merge schema %s and %s with different partition keys.", - currentSchema, upcomingSchema)); - checkState( - currentSchema.options().equals(upcomingSchema.options()), - String.format( - "Unable to merge schema %s and %s with different options.", - currentSchema, upcomingSchema)); - checkState( - Objects.equals(currentSchema.comment(), upcomingSchema.comment()), - String.format( - "Unable to merge schema %s and %s with different comments.", - currentSchema, upcomingSchema)); + + if (!currentSchema.primaryKeys().equals(upcomingSchema.primaryKeys())) { + throw new SchemaValidationException( + String.format( + "Unable to merge schema %s and %s with different primary keys.", + currentSchema, upcomingSchema)); + } + + if (!currentSchema.partitionKeys().equals(upcomingSchema.partitionKeys())) { + throw new SchemaValidationException( + String.format( + "Unable to merge schema %s and %s with different partition keys.", + currentSchema, upcomingSchema)); + } + if (!currentSchema.options().equals(upcomingSchema.options())) { + throw new SchemaValidationException( + String.format( + "Unable to merge schema %s and %s with different options.", + currentSchema, upcomingSchema)); + } + + if (!Objects.equals(currentSchema.comment(), upcomingSchema.comment())) { + throw new SchemaValidationException( + String.format( + "Unable to merge schema %s and %s with different comments.", + currentSchema, upcomingSchema)); + } } /** @@ -629,4 +639,11 @@ public class SchemaUtils { throw new IllegalArgumentException( "Failed to get precision of non-exact decimal type " + dataType); } + + /** Thrown to indicate that schema validation has failed due to incompatible schema. */ + public static class SchemaValidationException extends Exception { + public SchemaValidationException(String message) { + super(message); + } + } } diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index f5d3454b6..8db6d1c36 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -519,7 +519,8 @@ public class SchemaUtilsTest { .physicalColumn("col2", DataTypes.STRING()) .primaryKey("col2") .build()))) - .isExactlyInstanceOf(IllegalStateException.class) + .rootCause() + .isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class) .as("test primary key conflict") .hasMessage( "Unable to merge schema columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=() " @@ -539,7 +540,8 @@ public class SchemaUtilsTest { .physicalColumn("col2", DataTypes.STRING()) .partitionKey("col2") .build()))) - .isExactlyInstanceOf(IllegalStateException.class) + .rootCause() + .isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class) .as("test partition key conflict") .hasMessage( "Unable to merge schema columns={`col1` STRING,`col2` STRING}, primaryKeys=, partitionKeys=col1, options=() " diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 50607bd3b..ba07e705b 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.cdc.composer.definition.SinkDef; @@ -400,7 +401,7 @@ class FlinkPipelineTransformITCase { null)), Collections.emptyList())) .rootCause() - .isExactlyInstanceOf(IllegalStateException.class) + .isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class) .hasMessage( "Unable to merge schema columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() " + "and columns={`id` BIGINT NOT NULL,`name` STRING}, primaryKeys=id, options=() with different column counts."); @@ -408,7 +409,7 @@ class FlinkPipelineTransformITCase { @ParameterizedTest @EnumSource - void testMultiTransformColumNameCompatibility(ValuesDataSink.SinkApi sinkApi) { + void testMultiTransformColumnNameCompatibility(ValuesDataSink.SinkApi sinkApi) { assertThatThrownBy( () -> runGenericTransformTest( @@ -435,7 +436,7 @@ class FlinkPipelineTransformITCase { null)), Collections.emptyList())) .rootCause() - .isExactlyInstanceOf(IllegalStateException.class) + .isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class) .hasMessage( "Unable to merge column `age` TINYINT and `name` STRING with different name."); } @@ -468,7 +469,7 @@ class FlinkPipelineTransformITCase { null)), Collections.emptyList())) .rootCause() - .isExactlyInstanceOf(IllegalStateException.class) + .isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class) .hasMessage( "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, partitionKeys=age, options=() " + "and columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, partitionKeys=id, options=() with different partition keys.");