[FLINK-36128][cdc-runtime] Fix potential unrecoverable in-flight data exception by promoting LENIENT as the default schema change behavior

This closes #3574.

(cherry picked from commit 2e938a92f5)
pull/3582/head
yuxiqian 5 months ago committed by Leonard Xu
parent daf28ddc99
commit debd43cdd7

@ -19,6 +19,8 @@ package org.apache.flink.cdc.cli.parser;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
@ -35,11 +37,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
@ -99,6 +103,19 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
throws Exception {
// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
// it's not of plain data types and must be removed before calling toPipelineConfig.
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));
// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
// Source is required
SourceDef sourceDef =
toSourceDef(
@ -113,7 +130,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
checkNotNull(
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline definition",
SINK_KEY));
SINK_KEY),
schemaChangeBehavior);
// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
@ -128,14 +146,6 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));
// UDFs are optional
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));
// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
@ -162,7 +172,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
return new SourceDef(type, name, Configuration.fromMap(sourceMap));
}
private SinkDef toSinkDef(JsonNode sinkNode) {
private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) {
List<String> includedSETypes = new ArrayList<>();
List<String> excludedSETypes = new ArrayList<>();
@ -172,6 +182,14 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES))
.ifPresent(e -> e.forEach(tag -> excludedSETypes.add(tag.asText())));
if (includedSETypes.isEmpty()) {
// If no schema evolution types are specified, include all schema evolution types by
// default.
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
.map(SchemaChangeEventType::getTag)
.forEach(includedSETypes::add);
}
Set<SchemaChangeEventType> declaredSETypes =
resolveSchemaEvolutionOptions(includedSETypes, excludedSETypes);

@ -27,6 +27,7 @@ import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.composer.definition.UdfDef;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
@ -37,6 +38,11 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
@ -384,7 +390,13 @@ class YamlPipelineDefinitionParserTest {
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("bootstrap-servers", "localhost:9092")
.build())),
.build()),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.singletonList(
new RouteDef(
"mydb.default.app_order_.*",
@ -401,7 +413,16 @@ class YamlPipelineDefinitionParserTest {
private final PipelineDef minimizedDef =
new PipelineDef(
new SourceDef("mysql", null, new Configuration()),
new SinkDef("kafka", null, new Configuration()),
new SinkDef(
"kafka",
null,
new Configuration(),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
@ -474,7 +495,16 @@ class YamlPipelineDefinitionParserTest {
private final PipelineDef pipelineDefWithUdf =
new PipelineDef(
new SourceDef("values", null, new Configuration()),
new SinkDef("values", null, new Configuration()),
new SinkDef(
"values",
null,
new Configuration(),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.emptyList(),
Collections.singletonList(
new TransformDef(

@ -22,11 +22,21 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving;
/** An enumeration of schema change event types for {@link SchemaChangeEvent}. */
@PublicEvolving
public enum SchemaChangeEventType {
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
RENAME_COLUMN;
ADD_COLUMN("add.column"),
ALTER_COLUMN_TYPE("alter.column.type"),
CREATE_TABLE("create.table"),
DROP_COLUMN("drop.column"),
RENAME_COLUMN("rename.column");
private final String tag;
SchemaChangeEventType(String tag) {
this.tag = tag;
}
public String getTag() {
return tag;
}
public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {

@ -48,7 +48,7 @@ public class PipelineOptions {
public static final ConfigOption<SchemaChangeBehavior> PIPELINE_SCHEMA_CHANGE_BEHAVIOR =
ConfigOptions.key("schema.change.behavior")
.enumType(SchemaChangeBehavior.class)
.defaultValue(SchemaChangeBehavior.EVOLVE)
.defaultValue(SchemaChangeBehavior.LENIENT)
.withDescription(
Description.builder()
.text("Behavior for handling schema change events. ")

@ -95,12 +95,8 @@ public class ChangeEventUtils {
List<String> includedSchemaEvolutionTypes, List<String> excludedSchemaEvolutionTypes) {
List<SchemaChangeEventType> resultTypes = new ArrayList<>();
if (includedSchemaEvolutionTypes.isEmpty()) {
resultTypes.addAll(Arrays.asList(SchemaChangeEventTypeFamily.ALL));
} else {
for (String includeTag : includedSchemaEvolutionTypes) {
resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
}
for (String includeTag : includedSchemaEvolutionTypes) {
resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
}
for (String excludeTag : excludedSchemaEvolutionTypes) {

@ -17,59 +17,67 @@
package org.apache.flink.cdc.common.utils;
import org.assertj.core.api.Assertions;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat;
/** A test for the {@link org.apache.flink.cdc.common.utils.ChangeEventUtils}. */
public class ChangeEventUtilsTest {
@Test
public void testResolveSchemaEvolutionOptions() {
Assertions.assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.emptyList()))
List<String> allTags =
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
.map(SchemaChangeEventType::getTag)
.collect(Collectors.toList());
assertThat(ChangeEventUtils.resolveSchemaEvolutionOptions(allTags, Collections.emptyList()))
.isEqualTo(
Sets.set(
RENAME_COLUMN,
CREATE_TABLE,
ADD_COLUMN,
ALTER_COLUMN_TYPE,
DROP_COLUMN,
RENAME_COLUMN));
ADD_COLUMN,
DROP_COLUMN));
Assertions.assertThat(
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.singletonList("drop")))
.isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN));
allTags, Collections.singletonList("drop")))
.isEqualTo(Sets.set(ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN, CREATE_TABLE));
Assertions.assertThat(
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Arrays.asList("create", "add"), Collections.emptyList()))
.isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN));
.isEqualTo(Sets.set(ADD_COLUMN, CREATE_TABLE));
Assertions.assertThat(
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.singletonList("column"),
Collections.singletonList("drop.column")))
.isEqualTo(Sets.set(ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN));
Assertions.assertThat(
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.singletonList("drop.column")))
.isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN, ALTER_COLUMN_TYPE, RENAME_COLUMN));
allTags, Collections.singletonList("drop.column")))
.isEqualTo(Sets.set(ADD_COLUMN, RENAME_COLUMN, ALTER_COLUMN_TYPE, CREATE_TABLE));
}
@Test
public void testResolveSchemaEvolutionTag() {
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("all"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("all"))
.isEqualTo(
Arrays.asList(
ADD_COLUMN,
@ -78,41 +86,38 @@ public class ChangeEventUtilsTest {
DROP_COLUMN,
RENAME_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column"))
.isEqualTo(
Arrays.asList(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table"))
.isEqualTo(Collections.singletonList(CREATE_TABLE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename"))
.isEqualTo(Collections.singletonList(RENAME_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename.column"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename.column"))
.isEqualTo(Collections.singletonList(RENAME_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop"))
.isEqualTo(Collections.singletonList(DROP_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop.column"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop.column"))
.isEqualTo(Collections.singletonList(DROP_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create"))
.isEqualTo(Collections.singletonList(CREATE_TABLE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create.table"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create.table"))
.isEqualTo(Collections.singletonList(CREATE_TABLE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter"))
.isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.type"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.type"))
.isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add"))
.isEqualTo(Collections.singletonList(ADD_COLUMN));
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column"))
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column"))
.isEqualTo(Collections.singletonList(ADD_COLUMN));
}
}

@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
@ -134,6 +135,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -191,6 +194,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -313,6 +318,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -373,6 +380,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -441,6 +450,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -496,6 +507,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -569,6 +582,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -766,6 +781,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -972,6 +989,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -1035,6 +1054,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,

@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
@ -213,6 +214,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles");
PipelineDef pipelineDef =
new PipelineDef(
@ -266,6 +269,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -350,6 +355,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -436,6 +443,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -516,6 +525,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -608,6 +619,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,

@ -19,6 +19,7 @@ package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
@ -139,6 +140,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -207,6 +210,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -273,6 +278,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -341,6 +348,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -411,6 +420,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -472,6 +483,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -565,6 +578,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -632,6 +647,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -697,6 +714,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -758,6 +777,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,

@ -210,7 +210,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: %d",
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,

@ -228,11 +228,12 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: unexpected\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
schemaEvolveDatabase.getDatabaseName());
schemaEvolveDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -305,7 +306,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: %s\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@ -313,7 +314,8 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
mergeTable ? "(members|new_members)" : "members",
dbName,
dbName,
behavior);
behavior,
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");

@ -738,12 +738,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " projection: ID, 'id -> ' || ID AS UID, PRICEALPHA AS PRICE\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -835,12 +837,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " projection: \\*, 'id -> ' || ID AS UID\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -940,12 +944,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " projection: ID || ' <- id' AS UID, *\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");

@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@ -108,7 +109,10 @@ public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEve
private void broadcastEvent(Event toBroadcast) {
for (int i = 0; i < downstreamParallelism; i++) {
output.collect(new StreamRecord<>(new PartitioningEvent(toBroadcast, i)));
// Deep-copying each event is required since downstream subTasks might run in the same
// JVM
Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast);
output.collect(new StreamRecord<>(new PartitioningEvent(copiedEvent, i)));
}
}

Loading…
Cancel
Save