[FLINK-36128][cdc-runtime] Fix potential unrecoverable in-flight data exception by promoting LENIENT as the default schema change behavior

This closes #3574.
pull/3576/head
yuxiqian 5 months ago committed by GitHub
parent aea2b6aa9c
commit 2e938a92f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -19,6 +19,8 @@ package org.apache.flink.cdc.cli.parser;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
@ -35,11 +37,14 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
@ -99,6 +104,19 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
throws Exception {
// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
// it's not of plain data types and must be removed before calling toPipelineConfig.
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));
// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
// Source is required
SourceDef sourceDef =
toSourceDef(
@ -113,7 +131,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
checkNotNull(
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline definition",
SINK_KEY));
SINK_KEY),
schemaChangeBehavior);
// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
@ -128,14 +147,6 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));
// UDFs are optional
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));
// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
@ -162,7 +173,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
return new SourceDef(type, name, Configuration.fromMap(sourceMap));
}
private SinkDef toSinkDef(JsonNode sinkNode) {
private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) {
List<String> includedSETypes = new ArrayList<>();
List<String> excludedSETypes = new ArrayList<>();
@ -172,6 +183,23 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES))
.ifPresent(e -> e.forEach(tag -> excludedSETypes.add(tag.asText())));
if (includedSETypes.isEmpty()) {
// If no schema evolution types are specified, include all schema evolution types by
// default.
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
.map(SchemaChangeEventType::getTag)
.forEach(includedSETypes::add);
}
if (excludedSETypes.isEmpty()
&& SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
// In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This could be
// overridden by manually specifying excluded types.
Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE)
.map(SchemaChangeEventType::getTag)
.forEach(excludedSETypes::add);
}
Set<SchemaChangeEventType> declaredSETypes =
resolveSchemaEvolutionOptions(includedSETypes, excludedSETypes);

@ -27,6 +27,7 @@ import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.composer.definition.UdfDef;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
@ -37,6 +38,11 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
@ -384,7 +390,13 @@ class YamlPipelineDefinitionParserTest {
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("bootstrap-servers", "localhost:9092")
.build())),
.build()),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.singletonList(
new RouteDef(
"mydb.default.app_order_.*",
@ -401,7 +413,16 @@ class YamlPipelineDefinitionParserTest {
private final PipelineDef minimizedDef =
new PipelineDef(
new SourceDef("mysql", null, new Configuration()),
new SinkDef("kafka", null, new Configuration()),
new SinkDef(
"kafka",
null,
new Configuration(),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
@ -474,7 +495,16 @@ class YamlPipelineDefinitionParserTest {
private final PipelineDef pipelineDefWithUdf =
new PipelineDef(
new SourceDef("values", null, new Configuration()),
new SinkDef("values", null, new Configuration()),
new SinkDef(
"values",
null,
new Configuration(),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.emptyList(),
Collections.singletonList(
new TransformDef(

@ -22,13 +22,23 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving;
/** An enumeration of schema change event types for {@link SchemaChangeEvent}. */
@PublicEvolving
public enum SchemaChangeEventType {
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE;
ADD_COLUMN("add.column"),
ALTER_COLUMN_TYPE("alter.column.type"),
CREATE_TABLE("create.table"),
DROP_COLUMN("drop.column"),
DROP_TABLE("drop.table"),
RENAME_COLUMN("rename.column"),
TRUNCATE_TABLE("truncate.table");
private final String tag;
SchemaChangeEventType(String tag) {
this.tag = tag;
}
public String getTag() {
return tag;
}
public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {

@ -48,7 +48,7 @@ public class PipelineOptions {
public static final ConfigOption<SchemaChangeBehavior> PIPELINE_SCHEMA_CHANGE_BEHAVIOR =
ConfigOptions.key("schema.change.behavior")
.enumType(SchemaChangeBehavior.class)
.defaultValue(SchemaChangeBehavior.EVOLVE)
.defaultValue(SchemaChangeBehavior.LENIENT)
.withDescription(
Description.builder()
.text("Behavior for handling schema change events. ")

@ -90,12 +90,8 @@ public class ChangeEventUtils {
List<String> includedSchemaEvolutionTypes, List<String> excludedSchemaEvolutionTypes) {
List<SchemaChangeEventType> resultTypes = new ArrayList<>();
if (includedSchemaEvolutionTypes.isEmpty()) {
resultTypes.addAll(Arrays.asList(SchemaChangeEventTypeFamily.ALL));
} else {
for (String includeTag : includedSchemaEvolutionTypes) {
resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
}
for (String includeTag : includedSchemaEvolutionTypes) {
resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
}
for (String excludeTag : excludedSchemaEvolutionTypes) {

@ -17,11 +17,16 @@
package org.apache.flink.cdc.common.utils;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
@ -36,9 +41,12 @@ import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.a
public class ChangeEventUtilsTest {
@Test
public void testResolveSchemaEvolutionOptions() {
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.emptyList()))
List<String> allTags =
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
.map(SchemaChangeEventType::getTag)
.collect(Collectors.toList());
assertThat(ChangeEventUtils.resolveSchemaEvolutionOptions(allTags, Collections.emptyList()))
.isEqualTo(
Sets.set(
TRUNCATE_TABLE,
@ -51,7 +59,7 @@ public class ChangeEventUtilsTest {
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.singletonList("drop")))
allTags, Collections.singletonList("drop")))
.isEqualTo(
Sets.set(
ADD_COLUMN,
@ -73,7 +81,7 @@ public class ChangeEventUtilsTest {
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
Collections.emptyList(), Collections.singletonList("drop.column")))
allTags, Collections.singletonList("drop.column")))
.isEqualTo(
Sets.set(
ADD_COLUMN,

@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
@ -134,6 +135,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -191,6 +194,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -313,6 +318,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -373,6 +380,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -441,6 +450,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -496,6 +507,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -569,6 +582,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -766,6 +781,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -972,6 +989,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -1035,6 +1054,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,

@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
@ -213,6 +214,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles");
PipelineDef pipelineDef =
new PipelineDef(
@ -266,6 +269,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -350,6 +355,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -436,6 +443,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -516,6 +525,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -608,6 +619,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,

@ -19,6 +19,7 @@ package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
@ -139,6 +140,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -207,6 +210,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -273,6 +278,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -341,6 +348,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -411,6 +420,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -472,6 +483,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -565,6 +578,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -632,6 +647,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -697,6 +714,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@ -758,6 +777,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,

@ -210,7 +210,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: %d",
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,

@ -190,7 +190,6 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null, null, 17.0, null], op=INSERT, meta=()}"));
assertNotExists(

@ -189,7 +189,6 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}"));
}
@ -233,11 +232,12 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: unexpected\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
schemaEvolveDatabase.getDatabaseName());
schemaEvolveDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -310,7 +310,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: %s\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@ -318,7 +318,8 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
mergeTable ? "(members|new_members)" : "members",
dbName,
dbName,
behavior);
behavior,
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");

@ -738,12 +738,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " projection: ID, 'id -> ' || ID AS UID, PRICEALPHA AS PRICE\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -835,12 +837,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " projection: \\*, 'id -> ' || ID AS UID\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -940,12 +944,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " projection: ID || ' <- id' AS UID, *\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");

@ -403,10 +403,6 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
return events;
}
case DROP_TABLE:
// We don't drop any tables in Lenient mode.
LOG.info("A drop table event {} has been ignored in Lenient mode.", event);
return Collections.emptyList();
default:
return Collections.singletonList(event);
}

@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@ -108,7 +109,10 @@ public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEve
private void broadcastEvent(Event toBroadcast) {
for (int i = 0; i < downstreamParallelism; i++) {
output.collect(new StreamRecord<>(new PartitioningEvent(toBroadcast, i)));
// Deep-copying each event is required since downstream subTasks might run in the same
// JVM
Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast);
output.collect(new StreamRecord<>(new PartitioningEvent(copiedEvent, i)));
}
}

Loading…
Cancel
Save