From 4df3d9272135103304d7413aea20bd3b9aafa2aa Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 6 Nov 2024 19:58:10 +0800 Subject: [PATCH] [FLINK-36514][cdc-cli] Fix unable to override exclude schema types in lenient mode This closes #3637. Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../parser/YamlPipelineDefinitionParser.java | 4 +- .../YamlPipelineDefinitionParserTest.java | 97 +++++++++++++++++++ 2 files changed, 99 insertions(+), 2 deletions(-) diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index 9d2f4c998..3e65191e2 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -179,6 +179,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) { List includedSETypes = new ArrayList<>(); List excludedSETypes = new ArrayList<>(); + boolean excludedFieldNotPresent = sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES) == null; Optional.ofNullable(sinkNode.get(INCLUDE_SCHEMA_EVOLUTION_TYPES)) .ifPresent(e -> e.forEach(tag -> includedSETypes.add(tag.asText()))); @@ -194,8 +195,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { .forEach(includedSETypes::add); } - if (excludedSETypes.isEmpty() - && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) { + if (excludedFieldNotPresent && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) { // In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This could be // overridden by manually specifying excluded types. Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE) diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 1291b34ea..c1c47904e 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.cli.parser; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.cdc.composer.definition.RouteDef; @@ -38,12 +39,15 @@ import java.time.Duration; import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; +import java.util.Set; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; @@ -191,6 +195,99 @@ class YamlPipelineDefinitionParserTest { assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf); } + @Test + void testSchemaEvolutionTypesConfiguration() throws Exception { + testSchemaEvolutionTypesParsing( + "evolve", + null, + null, + ImmutableSet.of( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + DROP_COLUMN, + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE)); + testSchemaEvolutionTypesParsing( + "try_evolve", + null, + null, + ImmutableSet.of( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + DROP_COLUMN, + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE)); + testSchemaEvolutionTypesParsing( + "evolve", + "[column, table]", + "[drop]", + ImmutableSet.of( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE)); + testSchemaEvolutionTypesParsing( + "lenient", + null, + null, + ImmutableSet.of( + ADD_COLUMN, ALTER_COLUMN_TYPE, CREATE_TABLE, DROP_COLUMN, RENAME_COLUMN)); + testSchemaEvolutionTypesParsing( + "lenient", + null, + "[]", + ImmutableSet.of( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + DROP_COLUMN, + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE)); + } + + private void testSchemaEvolutionTypesParsing( + String behavior, String included, String excluded, Set expected) + throws Exception { + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = + parser.parse( + "source:\n" + + " type: foo\n" + + "sink:\n" + + " type: bar\n" + + (included != null + ? String.format(" include.schema.changes: %s\n", included) + : "") + + (excluded != null + ? String.format(" exclude.schema.changes: %s\n", excluded) + : "") + + "pipeline:\n" + + " schema.change.behavior: " + + behavior + + "\n" + + " parallelism: 1\n", + new Configuration()); + assertThat(pipelineDef) + .isEqualTo( + new PipelineDef( + new SourceDef("foo", null, new Configuration()), + new SinkDef("bar", null, new Configuration(), expected), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Configuration.fromMap( + ImmutableMap.builder() + .put("schema.change.behavior", behavior) + .put("parallelism", "1") + .build()))); + } + private final PipelineDef fullDef = new PipelineDef( new SourceDef(