validate meta schema in Multi Transform

pull/3865/head
MOBIN-F 3 weeks ago
parent 3e16a66972
commit 4ef3316a33

@ -73,11 +73,14 @@ import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.utils.Preconditions.checkState;
/**
* Utils for merging {@link Schema}s and {@link DataType}s. Prefer using this over {@link
* SchemaUtils} to get consistent schema merging behaviors.
@ -169,20 +172,30 @@ public class SchemaMergingUtils {
}
/** Merge compatible schemas. */
public static Schema getCommonSchema(List<Schema> schemas) {
if (schemas.isEmpty()) {
return null;
} else if (schemas.size() == 1) {
return schemas.get(0);
public static Schema getCommonSchema(LinkedHashSet<Schema> schemas) {
if (schemas.size() == 1) {
return schemas.iterator().next();
} else {
Schema outputSchema = null;
for (Schema schema : schemas) {
validateTransformColumnCounts(outputSchema, schema);
outputSchema = getLeastCommonSchema(outputSchema, schema);
}
return outputSchema;
}
}
public static void validateTransformColumnCounts(
@Nullable Schema currentSchema, Schema upcomingSchema) {
if (currentSchema != null) {
checkState(
currentSchema.getColumnCount() == upcomingSchema.getColumnCount(),
String.format(
"Unable to merge schema %s and %s with different column counts.",
currentSchema, upcomingSchema));
}
}
/**
* Generating what schema change events we need to do by converting compatible {@code
* beforeSchema} to {@code afterSchema}.

@ -41,6 +41,7 @@ import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -49,6 +50,8 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.cdc.common.utils.Preconditions.checkState;
/** Utils for {@link Schema} to perform the ability of evolution. */
@PublicEvolving
public class SchemaUtils {
@ -452,6 +455,43 @@ public class SchemaUtils {
return lSchema.copy(mergedColumns);
}
public static void validateMetaSchemaCompatibility(LinkedHashSet<Schema> schemas) {
if (schemas.size() > 1) {
Schema outputSchema = null;
for (Schema schema : schemas) {
isMetaSchemaCompatible(outputSchema, schema);
outputSchema = schema;
}
}
}
public static void isMetaSchemaCompatible(
@Nullable Schema currentSchema, Schema upcomingSchema) {
if (currentSchema == null) {
return;
}
checkState(
currentSchema.primaryKeys().equals(upcomingSchema.primaryKeys()),
String.format(
"Unable to merge schema %s and %s with different primary keys.",
currentSchema, upcomingSchema));
checkState(
currentSchema.partitionKeys().equals(upcomingSchema.partitionKeys()),
String.format(
"Unable to merge schema %s and %s with different partition keys.",
currentSchema, upcomingSchema));
checkState(
currentSchema.options().equals(upcomingSchema.options()),
String.format(
"Unable to merge schema %s and %s with different options.",
currentSchema, upcomingSchema));
checkState(
Objects.equals(currentSchema.comment(), upcomingSchema.comment()),
String.format(
"Unable to merge schema %s and %s with different comments.",
currentSchema, upcomingSchema));
}
/**
* Try to combine two columns with potential incompatible type.
*

@ -67,6 +67,7 @@ import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getLeastCommo
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.getSchemaDifference;
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isDataTypeCompatible;
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.isSchemaCompatible;
import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.validateTransformColumnCounts;
/** A test for the {@link SchemaMergingUtils}. */
class SchemaMergingUtilsTest {
@ -1116,6 +1117,18 @@ class SchemaMergingUtilsTest {
MAP));
}
@Test
void testTransformColumnCounts() {
Assertions.assertThatCode(
() ->
validateTransformColumnCounts(
of("id", BIGINT, "name", VARCHAR(17)), of("id", BIGINT)))
.as("test different column counts compatibility")
.hasMessage(
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(17)}, primaryKeys=, options=() "
+ "and columns={`id` BIGINT}, primaryKeys=, options=() with different column counts.");
}
private static void assertTypeMergingVector(DataType incomingType, List<DataType> resultTypes) {
Assertions.assertThat(ALL_TYPES)
.map(type -> getLeastCommonType(type, incomingType))

@ -33,9 +33,12 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import static org.apache.flink.cdc.common.utils.SchemaUtils.validateMetaSchemaCompatibility;
/** A test for the {@link org.apache.flink.cdc.common.utils.SchemaUtils}. */
public class SchemaUtilsTest {
@ -484,4 +487,68 @@ public class SchemaUtilsTest {
.build()))
.isExactlyInstanceOf(IllegalStateException.class);
}
@Test
void testMetaSchemaCompatibility() {
Assertions.assertThatCode(
() ->
validateMetaSchemaCompatibility(
of(
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.build(),
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.build())))
.as("test same schema")
.doesNotThrowAnyException();
Assertions.assertThatThrownBy(
() ->
validateMetaSchemaCompatibility(
of(
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build(),
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col2")
.build())))
.isExactlyInstanceOf(IllegalStateException.class)
.as("test primary key conflict")
.hasMessage(
"Unable to merge schema columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=() "
+ "and columns={`col1` STRING,`col2` STRING}, primaryKeys=col2, options=() with different primary keys.");
Assertions.assertThatThrownBy(
() ->
validateMetaSchemaCompatibility(
of(
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.partitionKey("col1")
.build(),
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.partitionKey("col2")
.build())))
.isExactlyInstanceOf(IllegalStateException.class)
.as("test partition key conflict")
.hasMessage(
"Unable to merge schema columns={`col1` STRING,`col2` STRING}, primaryKeys=, partitionKeys=col1, options=() "
+ "and columns={`col1` STRING,`col2` STRING}, primaryKeys=, partitionKeys=col2, options=() with different partition keys.");
}
private static LinkedHashSet<Schema> of(Schema... args) {
LinkedHashSet<Schema> schemas = new LinkedHashSet<>(args.length);
schemas.addAll(Arrays.asList(args));
return schemas;
}
}

@ -457,9 +457,9 @@ class FlinkPipelineComposerITCase {
"default_namespace.default_schema.table1",
"*,concat(col1,'2') as col12",
"col1 = '2'",
null,
null,
null,
"col1",
"col12",
"key1=value1",
"",
null);
// Setup pipeline

@ -499,9 +499,9 @@ class FlinkPipelineComposerLenientITCase {
"default_namespace.default_schema.table1",
"*,concat(col1,'2') as col12",
"col1 = '2'",
null,
null,
null,
"col1",
"col12",
"key1=value1",
"",
null);
// Setup pipeline

@ -373,26 +373,21 @@ class FlinkPipelineTransformITCase {
@ParameterizedTest
@EnumSource
@Disabled("to be fixed in FLINK-37132")
void testMultiTransformSchemaColumnsCompatibilityWithNullProjection(
ValuesDataSink.SinkApi sinkApi) {
TransformDef nullProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
null,
"age < 18",
null,
null,
null,
null,
null);
void testMultiTransformColumnCountsCompatibility(ValuesDataSink.SinkApi sinkApi) {
assertThatThrownBy(
() ->
runGenericTransformTest(
sinkApi,
Arrays.asList(
nullProjection,
new TransformDef(
"default_namespace.default_schema.mytable2",
null,
"age < 18",
null,
null,
null,
null,
null),
new TransformDef(
"default_namespace.default_schema.mytable2",
// reference part column
@ -407,39 +402,33 @@ class FlinkPipelineTransformITCase {
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.hasMessage(
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+ "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts.");
"Unable to merge schema columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+ "and columns={`id` BIGINT NOT NULL,`name` STRING}, primaryKeys=id, options=() with different column counts.");
}
@ParameterizedTest
@EnumSource
@Disabled("to be fixed in FLINK-37132")
void testMultiTransformSchemaColumnsCompatibilityWithEmptyProjection(
ValuesDataSink.SinkApi sinkApi) {
TransformDef emptyProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
"",
"age < 18",
null,
null,
null,
null,
null);
void testMultiTransformMetaSchemaCompatibility(ValuesDataSink.SinkApi sinkApi) {
assertThatThrownBy(
() ->
runGenericTransformTest(
sinkApi,
Arrays.asList(
emptyProjection,
new TransformDef(
"default_namespace.default_schema.mytable2",
// reference part column
"id, name",
"age < 18",
null,
"age",
null,
null,
null),
new TransformDef(
"default_namespace.default_schema.mytable2",
"id,UPPER(name) AS name",
"age >= 18",
null,
null,
"id",
null,
null,
null)),
@ -447,8 +436,8 @@ class FlinkPipelineTransformITCase {
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.hasMessage(
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+ "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts.");
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, partitionKeys=age, options=() "
+ "and columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, partitionKeys=id, options=() with different partition keys.");
}
@ParameterizedTest

@ -53,6 +53,7 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -351,7 +352,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
}
private Schema transformSchema(TableId tableId, Schema schema) {
List<Schema> newSchemas = new ArrayList<>();
LinkedHashSet<Schema> newSchemas = new LinkedHashSet<>();
for (PostTransformer transform : transforms) {
Selectors selectors = transform.getSelectors();
if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) {

@ -52,6 +52,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@ -71,6 +72,7 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
private transient List<PreTransformer> transforms;
private final Map<TableId, PreTransformChangeInfo> preTransformChangeInfoMap;
private final List<Tuple2<Selectors, SchemaMetadataTransform>> schemaMetadataTransformers;
private final LinkedHashMap<Selectors, SchemaMetadataTransform> schemaMetadataTransformersLink;
private transient ListState<byte[]> state;
private final List<Tuple3<String, String, Map<String, String>>> udfFunctions;
private List<UserDefinedFunctionDescriptor> udfDescriptors;
@ -152,6 +154,7 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
this.preTransformChangeInfoMap = new ConcurrentHashMap<>();
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.schemaMetadataTransformers = new ArrayList<>();
this.schemaMetadataTransformersLink = new LinkedHashMap<>();
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.transformRules = transformRules;
@ -191,6 +194,9 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
new Tuple2<>(
selectors,
new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions)));
schemaMetadataTransformersLink.put(
selectors,
new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions));
}
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.hasAsteriskMap = new ConcurrentHashMap<>();
@ -362,18 +368,16 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {
TableId tableId = createTableEvent.tableId();
for (Tuple2<Selectors, SchemaMetadataTransform> transform : schemaMetadataTransformers) {
Selectors selectors = transform.f0;
if (selectors.isMatch(tableId)) {
createTableEvent =
new CreateTableEvent(
tableId,
transformSchemaMetaData(
createTableEvent.getSchema(), transform.f1));
}
}
cachePreTransformProcessor(tableId, createTableEvent.getSchema());
Schema originalSchema = createTableEvent.getSchema();
LinkedHashSet<Schema> newSchemas =
schemaMetadataTransformers.stream()
.filter(transform -> transform.f0.isMatch(tableId))
.map(transform -> transformSchemaMetaData(originalSchema, transform.f1))
.collect(Collectors.toCollection(LinkedHashSet::new));
SchemaUtils.validateMetaSchemaCompatibility(newSchemas);
Schema commonSchema = newSchemas.isEmpty() ? originalSchema : newSchemas.iterator().next();
createTableEvent = new CreateTableEvent(tableId, commonSchema);
cachePreTransformProcessor(tableId, commonSchema);
if (preTransformProcessorMap.containsKey(tableId)) {
return preTransformProcessorMap
.get(tableId)

Loading…
Cancel
Save