[FLINK-36514][cdc-cli] Fix unable to override exclude schema types in lenient mode

This closes #3637.

Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
pull/3596/head
yuxiqian 3 months ago committed by GitHub
parent 84f97ed4c1
commit 4df3d92721
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -179,6 +179,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) {
List<String> includedSETypes = new ArrayList<>();
List<String> excludedSETypes = new ArrayList<>();
boolean excludedFieldNotPresent = sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES) == null;
Optional.ofNullable(sinkNode.get(INCLUDE_SCHEMA_EVOLUTION_TYPES))
.ifPresent(e -> e.forEach(tag -> includedSETypes.add(tag.asText())));
@ -194,8 +195,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
.forEach(includedSETypes::add);
}
if (excludedSETypes.isEmpty()
&& SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
if (excludedFieldNotPresent && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
// In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This could be
// overridden by manually specifying excluded types.
Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE)

@ -18,6 +18,7 @@
package org.apache.flink.cdc.cli.parser;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
@ -38,12 +39,15 @@ import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
@ -191,6 +195,99 @@ class YamlPipelineDefinitionParserTest {
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf);
}
@Test
void testSchemaEvolutionTypesConfiguration() throws Exception {
testSchemaEvolutionTypesParsing(
"evolve",
null,
null,
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
testSchemaEvolutionTypesParsing(
"try_evolve",
null,
null,
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
testSchemaEvolutionTypesParsing(
"evolve",
"[column, table]",
"[drop]",
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
testSchemaEvolutionTypesParsing(
"lenient",
null,
null,
ImmutableSet.of(
ADD_COLUMN, ALTER_COLUMN_TYPE, CREATE_TABLE, DROP_COLUMN, RENAME_COLUMN));
testSchemaEvolutionTypesParsing(
"lenient",
null,
"[]",
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
}
private void testSchemaEvolutionTypesParsing(
String behavior, String included, String excluded, Set<SchemaChangeEventType> expected)
throws Exception {
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
parser.parse(
"source:\n"
+ " type: foo\n"
+ "sink:\n"
+ " type: bar\n"
+ (included != null
? String.format(" include.schema.changes: %s\n", included)
: "")
+ (excluded != null
? String.format(" exclude.schema.changes: %s\n", excluded)
: "")
+ "pipeline:\n"
+ " schema.change.behavior: "
+ behavior
+ "\n"
+ " parallelism: 1\n",
new Configuration());
assertThat(pipelineDef)
.isEqualTo(
new PipelineDef(
new SourceDef("foo", null, new Configuration()),
new SinkDef("bar", null, new Configuration(), expected),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("schema.change.behavior", behavior)
.put("parallelism", "1")
.build())));
}
private final PipelineDef fullDef =
new PipelineDef(
new SourceDef(

Loading…
Cancel
Save