fix comment

pull/3865/head
MOBIN-F 1 week ago
parent 440d1d28f1
commit 866f0abe59

@ -71,16 +71,13 @@ import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.cdc.common.utils.Preconditions.checkState;
/**
* Utils for merging {@link Schema}s and {@link DataType}s. Prefer using this over {@link
@ -173,13 +170,17 @@ public class SchemaMergingUtils {
}
/** Merge compatible schemas. */
public static Schema getCommonSchema(LinkedHashSet<Schema> schemas) {
public static Schema getCommonSchema(Collection<Schema> schemas) {
if (schemas.size() == 1) {
return schemas.iterator().next();
} else {
Schema outputSchema = null;
for (Schema schema : schemas) {
validateTransformColumn(outputSchema, schema);
try {
validateTransformColumn(outputSchema, schema);
} catch (SchemaUtils.SchemaValidationException e) {
throw new IllegalStateException("Schema validation failed.", e);
}
outputSchema = getLeastCommonSchema(outputSchema, schema);
}
return outputSchema;
@ -187,35 +188,34 @@ public class SchemaMergingUtils {
}
public static void validateTransformColumn(
@Nullable Schema currentSchema, Schema upcomingSchema) {
@Nullable Schema currentSchema, Schema upcomingSchema)
throws SchemaUtils.SchemaValidationException {
if (currentSchema != null) {
checkState(
currentSchema.getColumnCount() == upcomingSchema.getColumnCount(),
String.format(
"Unable to merge schema %s and %s with different column counts.",
currentSchema, upcomingSchema));
if (currentSchema.getColumnCount() != upcomingSchema.getColumnCount()) {
throw new SchemaUtils.SchemaValidationException(
String.format(
"Unable to merge schema %s and %s with different column counts.",
currentSchema, upcomingSchema));
}
List<Column> currentColumns = currentSchema.getColumns();
List<Column> upcomingColumns = upcomingSchema.getColumns();
IntStream.range(0, currentColumns.size())
.forEach(
i -> {
Column currentColumn = currentColumns.get(i);
Column upcomingColumn = upcomingColumns.get(i);
checkState(
Objects.equals(
currentColumn.getName(), upcomingColumn.getName()),
String.format(
"Unable to merge column %s and %s with different name.",
currentColumn, upcomingColumn));
checkState(
Objects.equals(
currentColumn.getComment(),
upcomingColumn.getComment()),
String.format(
"Unable to merge column %s and %s with different comments.",
currentColumn, upcomingColumn));
});
for (int i = 0; i < currentColumns.size(); i++) {
Column currentColumn = currentColumns.get(i);
Column upcomingColumn = upcomingColumns.get(i);
if (!Objects.equals(currentColumn.getName(), upcomingColumn.getName())) {
throw new SchemaUtils.SchemaValidationException(
String.format(
"Unable to merge column %s and %s with different name.",
currentColumn, upcomingColumn));
}
if (!Objects.equals(currentColumn.getComment(), upcomingColumn.getComment())) {
throw new SchemaUtils.SchemaValidationException(
String.format(
"Unable to merge column %s and %s with different comments.",
currentColumn, upcomingColumn));
}
}
}
}

@ -41,7 +41,7 @@ import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -50,8 +50,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.cdc.common.utils.Preconditions.checkState;
/** Utils for {@link Schema} to perform the ability of evolution. */
@PublicEvolving
public class SchemaUtils {
@ -455,41 +453,53 @@ public class SchemaUtils {
return lSchema.copy(mergedColumns);
}
public static void validateMetaSchemaCompatibility(LinkedHashSet<Schema> schemas) {
public static void validateMetaSchemaCompatibility(Collection<Schema> schemas) {
if (schemas.size() > 1) {
Schema outputSchema = null;
for (Schema schema : schemas) {
validateMetaSchemaCompatible(outputSchema, schema);
try {
validateMetaSchemaCompatible(outputSchema, schema);
} catch (SchemaValidationException e) {
throw new IllegalStateException("Schema validation failed.", e);
}
outputSchema = schema;
}
}
}
public static void validateMetaSchemaCompatible(
@Nullable Schema currentSchema, Schema upcomingSchema) {
@Nullable Schema currentSchema, Schema upcomingSchema)
throws SchemaValidationException {
if (currentSchema == null) {
return;
}
checkState(
currentSchema.primaryKeys().equals(upcomingSchema.primaryKeys()),
String.format(
"Unable to merge schema %s and %s with different primary keys.",
currentSchema, upcomingSchema));
checkState(
currentSchema.partitionKeys().equals(upcomingSchema.partitionKeys()),
String.format(
"Unable to merge schema %s and %s with different partition keys.",
currentSchema, upcomingSchema));
checkState(
currentSchema.options().equals(upcomingSchema.options()),
String.format(
"Unable to merge schema %s and %s with different options.",
currentSchema, upcomingSchema));
checkState(
Objects.equals(currentSchema.comment(), upcomingSchema.comment()),
String.format(
"Unable to merge schema %s and %s with different comments.",
currentSchema, upcomingSchema));
if (!currentSchema.primaryKeys().equals(upcomingSchema.primaryKeys())) {
throw new SchemaValidationException(
String.format(
"Unable to merge schema %s and %s with different primary keys.",
currentSchema, upcomingSchema));
}
if (!currentSchema.partitionKeys().equals(upcomingSchema.partitionKeys())) {
throw new SchemaValidationException(
String.format(
"Unable to merge schema %s and %s with different partition keys.",
currentSchema, upcomingSchema));
}
if (!currentSchema.options().equals(upcomingSchema.options())) {
throw new SchemaValidationException(
String.format(
"Unable to merge schema %s and %s with different options.",
currentSchema, upcomingSchema));
}
if (!Objects.equals(currentSchema.comment(), upcomingSchema.comment())) {
throw new SchemaValidationException(
String.format(
"Unable to merge schema %s and %s with different comments.",
currentSchema, upcomingSchema));
}
}
/**
@ -629,4 +639,11 @@ public class SchemaUtils {
throw new IllegalArgumentException(
"Failed to get precision of non-exact decimal type " + dataType);
}
/** Thrown to indicate that schema validation has failed due to incompatible schema. */
public static class SchemaValidationException extends Exception {
public SchemaValidationException(String message) {
super(message);
}
}
}

@ -519,7 +519,8 @@ public class SchemaUtilsTest {
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col2")
.build())))
.isExactlyInstanceOf(IllegalStateException.class)
.rootCause()
.isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class)
.as("test primary key conflict")
.hasMessage(
"Unable to merge schema columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=() "
@ -539,7 +540,8 @@ public class SchemaUtilsTest {
.physicalColumn("col2", DataTypes.STRING())
.partitionKey("col2")
.build())))
.isExactlyInstanceOf(IllegalStateException.class)
.rootCause()
.isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class)
.as("test partition key conflict")
.hasMessage(
"Unable to merge schema columns={`col1` STRING,`col2` STRING}, primaryKeys=, partitionKeys=col1, options=() "

@ -35,6 +35,7 @@ import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
@ -400,7 +401,7 @@ class FlinkPipelineTransformITCase {
null)),
Collections.emptyList()))
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class)
.hasMessage(
"Unable to merge schema columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+ "and columns={`id` BIGINT NOT NULL,`name` STRING}, primaryKeys=id, options=() with different column counts.");
@ -408,7 +409,7 @@ class FlinkPipelineTransformITCase {
@ParameterizedTest
@EnumSource
void testMultiTransformColumNameCompatibility(ValuesDataSink.SinkApi sinkApi) {
void testMultiTransformColumnNameCompatibility(ValuesDataSink.SinkApi sinkApi) {
assertThatThrownBy(
() ->
runGenericTransformTest(
@ -435,7 +436,7 @@ class FlinkPipelineTransformITCase {
null)),
Collections.emptyList()))
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class)
.hasMessage(
"Unable to merge column `age` TINYINT and `name` STRING with different name.");
}
@ -468,7 +469,7 @@ class FlinkPipelineTransformITCase {
null)),
Collections.emptyList()))
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.isExactlyInstanceOf(SchemaUtils.SchemaValidationException.class)
.hasMessage(
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, partitionKeys=age, options=() "
+ "and columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, partitionKeys=id, options=() with different partition keys.");

Loading…
Cancel
Save