diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index d5df8eda1..8179fdbff 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -19,6 +19,8 @@ package org.apache.flink.cdc.cli.parser; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.cdc.composer.definition.RouteDef; @@ -35,11 +37,14 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR; import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions; import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; @@ -99,6 +104,19 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig) throws Exception { + + // UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since + // it's not of plain data types and must be removed before calling toPipelineConfig. + List udfDefs = new ArrayList<>(); + Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY)) + .ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf)))); + + // Pipeline configs are optional + Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY)); + + SchemaChangeBehavior schemaChangeBehavior = + userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR); + // Source is required SourceDef sourceDef = toSourceDef( @@ -113,7 +131,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { checkNotNull( pipelineDefJsonNode.get(SINK_KEY), "Missing required field \"%s\" in pipeline definition", - SINK_KEY)); + SINK_KEY), + schemaChangeBehavior); // Transforms are optional List transformDefs = new ArrayList<>(); @@ -128,14 +147,6 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY)) .ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route)))); - // UDFs are optional - List udfDefs = new ArrayList<>(); - Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY)) - .ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf)))); - - // Pipeline configs are optional - Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY)); - // Merge user config into global config Configuration pipelineConfig = new Configuration(); pipelineConfig.addAll(globalPipelineConfig); @@ -162,7 +173,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { return new SourceDef(type, name, Configuration.fromMap(sourceMap)); } - private SinkDef toSinkDef(JsonNode sinkNode) { + private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) { List includedSETypes = new ArrayList<>(); List excludedSETypes = new ArrayList<>(); @@ -172,6 +183,23 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES)) .ifPresent(e -> e.forEach(tag -> excludedSETypes.add(tag.asText()))); + if (includedSETypes.isEmpty()) { + // If no schema evolution types are specified, include all schema evolution types by + // default. + Arrays.stream(SchemaChangeEventTypeFamily.ALL) + .map(SchemaChangeEventType::getTag) + .forEach(includedSETypes::add); + } + + if (excludedSETypes.isEmpty() + && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) { + // In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This could be + // overridden by manually specifying excluded types. + Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE) + .map(SchemaChangeEventType::getTag) + .forEach(excludedSETypes::add); + } + Set declaredSETypes = resolveSchemaEvolutionOptions(includedSETypes, excludedSETypes); diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index f57dd62c7..2ecf45870 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.composer.definition.TransformDef; import org.apache.flink.cdc.composer.definition.UdfDef; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet; import org.apache.flink.shaded.guava31.com.google.common.io.Resources; import org.junit.jupiter.api.Test; @@ -37,6 +38,11 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; @@ -384,7 +390,13 @@ class YamlPipelineDefinitionParserTest { Configuration.fromMap( ImmutableMap.builder() .put("bootstrap-servers", "localhost:9092") - .build())), + .build()), + ImmutableSet.of( + DROP_COLUMN, + ALTER_COLUMN_TYPE, + ADD_COLUMN, + CREATE_TABLE, + RENAME_COLUMN)), Collections.singletonList( new RouteDef( "mydb.default.app_order_.*", @@ -401,7 +413,16 @@ class YamlPipelineDefinitionParserTest { private final PipelineDef minimizedDef = new PipelineDef( new SourceDef("mysql", null, new Configuration()), - new SinkDef("kafka", null, new Configuration()), + new SinkDef( + "kafka", + null, + new Configuration(), + ImmutableSet.of( + DROP_COLUMN, + ALTER_COLUMN_TYPE, + ADD_COLUMN, + CREATE_TABLE, + RENAME_COLUMN)), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), @@ -474,7 +495,16 @@ class YamlPipelineDefinitionParserTest { private final PipelineDef pipelineDefWithUdf = new PipelineDef( new SourceDef("values", null, new Configuration()), - new SinkDef("values", null, new Configuration()), + new SinkDef( + "values", + null, + new Configuration(), + ImmutableSet.of( + DROP_COLUMN, + ALTER_COLUMN_TYPE, + ADD_COLUMN, + CREATE_TABLE, + RENAME_COLUMN)), Collections.emptyList(), Collections.singletonList( new TransformDef( diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java index 8132c29a3..bbe4b415c 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java @@ -22,13 +22,23 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving; /** An enumeration of schema change event types for {@link SchemaChangeEvent}. */ @PublicEvolving public enum SchemaChangeEventType { - ADD_COLUMN, - ALTER_COLUMN_TYPE, - CREATE_TABLE, - DROP_COLUMN, - DROP_TABLE, - RENAME_COLUMN, - TRUNCATE_TABLE; + ADD_COLUMN("add.column"), + ALTER_COLUMN_TYPE("alter.column.type"), + CREATE_TABLE("create.table"), + DROP_COLUMN("drop.column"), + DROP_TABLE("drop.table"), + RENAME_COLUMN("rename.column"), + TRUNCATE_TABLE("truncate.table"); + + private final String tag; + + SchemaChangeEventType(String tag) { + this.tag = tag; + } + + public String getTag() { + return tag; + } public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) { if (event instanceof AddColumnEvent) { diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 48a4fbb13..343e99270 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -48,7 +48,7 @@ public class PipelineOptions { public static final ConfigOption PIPELINE_SCHEMA_CHANGE_BEHAVIOR = ConfigOptions.key("schema.change.behavior") .enumType(SchemaChangeBehavior.class) - .defaultValue(SchemaChangeBehavior.EVOLVE) + .defaultValue(SchemaChangeBehavior.LENIENT) .withDescription( Description.builder() .text("Behavior for handling schema change events. ") diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java index 940dc3144..483752ce9 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java @@ -90,12 +90,8 @@ public class ChangeEventUtils { List includedSchemaEvolutionTypes, List excludedSchemaEvolutionTypes) { List resultTypes = new ArrayList<>(); - if (includedSchemaEvolutionTypes.isEmpty()) { - resultTypes.addAll(Arrays.asList(SchemaChangeEventTypeFamily.ALL)); - } else { - for (String includeTag : includedSchemaEvolutionTypes) { - resultTypes.addAll(resolveSchemaEvolutionTag(includeTag)); - } + for (String includeTag : includedSchemaEvolutionTypes) { + resultTypes.addAll(resolveSchemaEvolutionTag(includeTag)); } for (String excludeTag : excludedSchemaEvolutionTypes) { diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java index fd3636191..5cdff9c52 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java @@ -17,11 +17,16 @@ package org.apache.flink.cdc.common.utils; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; + import org.assertj.core.util.Sets; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; @@ -36,9 +41,12 @@ import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.a public class ChangeEventUtilsTest { @Test public void testResolveSchemaEvolutionOptions() { - assertThat( - ChangeEventUtils.resolveSchemaEvolutionOptions( - Collections.emptyList(), Collections.emptyList())) + + List allTags = + Arrays.stream(SchemaChangeEventTypeFamily.ALL) + .map(SchemaChangeEventType::getTag) + .collect(Collectors.toList()); + assertThat(ChangeEventUtils.resolveSchemaEvolutionOptions(allTags, Collections.emptyList())) .isEqualTo( Sets.set( TRUNCATE_TABLE, @@ -51,7 +59,7 @@ public class ChangeEventUtilsTest { assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( - Collections.emptyList(), Collections.singletonList("drop"))) + allTags, Collections.singletonList("drop"))) .isEqualTo( Sets.set( ADD_COLUMN, @@ -73,7 +81,7 @@ public class ChangeEventUtilsTest { assertThat( ChangeEventUtils.resolveSchemaEvolutionOptions( - Collections.emptyList(), Collections.singletonList("drop.column"))) + allTags, Collections.singletonList("drop.column"))) .isEqualTo( Sets.set( ADD_COLUMN, diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 5c2d48fe5..81d466aab 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; @@ -134,6 +135,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -191,6 +194,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -313,6 +318,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -373,6 +380,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -441,6 +450,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -496,6 +507,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -569,6 +582,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -766,6 +781,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -972,6 +989,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -1035,6 +1054,8 @@ class FlinkPipelineComposerITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index c489b2e81..4c2536011 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; @@ -213,6 +214,8 @@ class FlinkPipelineTransformITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles"); PipelineDef pipelineDef = new PipelineDef( @@ -266,6 +269,8 @@ class FlinkPipelineTransformITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -350,6 +355,8 @@ class FlinkPipelineTransformITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -436,6 +443,8 @@ class FlinkPipelineTransformITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -516,6 +525,8 @@ class FlinkPipelineTransformITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -608,6 +619,8 @@ class FlinkPipelineTransformITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index f9be7d7dd..c3b412dc7 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.cdc.composer.definition.SinkDef; @@ -139,6 +140,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -207,6 +210,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -273,6 +278,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -341,6 +348,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -411,6 +420,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -472,6 +483,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -565,6 +578,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -632,6 +647,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -697,6 +714,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, @@ -758,6 +777,8 @@ public class FlinkPipelineUdfITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( sourceDef, diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index 1f730be62..29fe5db9d 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -210,7 +210,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment { + " type: values\n" + "\n" + "pipeline:\n" - + " parallelism: %d", + + " parallelism: %d\n" + + " schema.change.behavior: evolve", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 92c2622c9..7551add08 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -190,7 +190,6 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", - "TruncateTableEvent{tableId=%s.members}", "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null, null, 17.0, null], op=INSERT, meta=()}")); assertNotExists( diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java index 1d1f79f0f..50d5dfb1d 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java @@ -189,7 +189,6 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}", - "TruncateTableEvent{tableId=%s.members}", "DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}")); } @@ -233,11 +232,12 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { + "\n" + "pipeline:\n" + " schema.change.behavior: unexpected\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - schemaEvolveDatabase.getDatabaseName()); + schemaEvolveDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -310,7 +310,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { + "\n" + "pipeline:\n" + " schema.change.behavior: %s\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, @@ -318,7 +318,8 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { mergeTable ? "(members|new_members)" : "members", dbName, dbName, - behavior); + behavior, + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 9c6130359..36e7d9a86 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -738,12 +738,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " projection: ID, 'id -> ' || ID AS UID, PRICEALPHA AS PRICE\n" + " filter: ID > 1008\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d\n" + + " schema.change.behavior: evolve", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -835,12 +837,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " projection: \\*, 'id -> ' || ID AS UID\n" + " filter: ID > 1008\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d\n" + + " schema.change.behavior: evolve", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -940,12 +944,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " projection: ID || ' <- id' AS UID, *\n" + " filter: ID > 1008\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d\n" + + " schema.change.behavior: evolve", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 444fb41d2..ae765bae2 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -403,10 +403,6 @@ public class SchemaRegistryRequestHandler implements Closeable { } return events; } - case DROP_TABLE: - // We don't drop any tables in Lenient mode. - LOG.info("A drop table event {} has been ignored in Lenient mode.", event); - return Collections.emptyList(); default: return Collections.singletonList(event); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java index 6d66fa878..1171ad07b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.function.HashFunctionProvider; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; +import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -108,7 +109,10 @@ public class PrePartitionOperator extends AbstractStreamOperator(new PartitioningEvent(toBroadcast, i))); + // Deep-copying each event is required since downstream subTasks might run in the same + // JVM + Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast); + output.collect(new StreamRecord<>(new PartitioningEvent(copiedEvent, i))); } }