[FLINK-36461][transform] Fix schema evolution failure with un-transformed tables

This closes  #3632.
pull/3572/head
yuxiqian 3 months ago committed by GitHub
parent ba74c7c45f
commit b5ab385b8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -372,6 +372,25 @@ public class SchemaUtils {
return oldSchema.copy(columns);
}
/**
* This function determines if the given schema change event {@code event} should be sent to
* downstream based on if the given transform rule has asterisk, and what columns are
* referenced.
*
* <p>For example, if {@code hasAsterisk} is false, then all {@code AddColumnEvent} and {@code
* DropColumnEvent} should be ignored since asterisk-less transform should not emit schema
* change events that change number of downstream columns.
*
* <p>Also, {@code referencedColumns} will be used to determine if the schema change event
* affects any referenced columns, since if a column has been projected out of downstream, its
* corresponding schema change events should not be emitted, either.
*
* <p>For the case when {@code hasAsterisk} is true, things will be cleaner since we don't have
* to filter out any schema change events. All we need to do is to change {@code
* AddColumnEvent}'s inserting position, and replacing `FIRST` / `LAST` with column-relative
* position indicators. This is necessary since extra calculated columns might be added, and
* `FIRST` / `LAST` position might differ.
*/
public static Optional<SchemaChangeEvent> transformSchemaChangeEvent(
boolean hasAsterisk, List<String> referencedColumns, SchemaChangeEvent event) {
Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = Optional.empty();

@ -1227,6 +1227,101 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 -> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}");
}
@Test
void testTransformUnmatchedSchemaEvolution() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
List<Event> events = generateSchemaEvolutionEvents(tableId);
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
// Setup value sink
Configuration sinkConfig = new Configuration();
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
"foo.bar.baz", // This doesn't match given tableId
"*",
null,
null,
null,
null,
null)),
Collections.emptyList(),
pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
// Initial stage
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22], after=[], op=DELETE, meta=()}",
// Add column stage
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
// Alter column type stage
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}",
// Rename column stage
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}",
// Drop column stage
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
}
private List<Event> generateSchemaEvolutionEvents(TableId tableId) {
List<Event> events = new ArrayList<>();

@ -44,6 +44,7 @@ import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -242,6 +243,8 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws Exception {
TableId tableId = event.tableId();
List<String> columnNamesBeforeChange = Collections.emptyList();
if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
Set<String> projectedColumnsSet =
@ -286,6 +289,9 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
createTableEvent.getSchema().getColumnNames().stream()
.filter(projectedColumnsSet::contains)
.collect(Collectors.toList()));
} else {
columnNamesBeforeChange =
getPostTransformChangeInfo(tableId).getPreTransformedSchema().getColumnNames();
}
Schema schema;
@ -304,9 +310,12 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
if (event instanceof CreateTableEvent) {
return Optional.of(new CreateTableEvent(tableId, projectedSchema));
} else if (hasAsteriskMap.getOrDefault(tableId, true)) {
// See comments in PreTransformOperator#cacheChangeSchema method.
return SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event);
} else {
return SchemaUtils.transformSchemaChangeEvent(
hasAsteriskMap.get(tableId), projectedColumnsMap.get(tableId), event);
false, projectedColumnsMap.get(tableId), event);
}
}

@ -29,7 +29,6 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
@ -52,7 +51,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@ -73,7 +71,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
private List<UserDefinedFunctionDescriptor> udfDescriptors;
private Map<TableId, PreTransformProcessor> preTransformProcessorMap;
private Map<TableId, Boolean> hasAsteriskMap;
private Map<TableId, List<String>> referencedColumnsMap;
public static PreTransformOperator.Builder newBuilder() {
return new PreTransformOperator.Builder();
@ -165,7 +162,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
}
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.hasAsteriskMap = new ConcurrentHashMap<>();
this.referencedColumnsMap = new ConcurrentHashMap<>();
}
@Override
@ -188,8 +184,7 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
new CreateTableEvent(
stateTableChangeInfo.getTableId(),
stateTableChangeInfo.getPreTransformedSchema());
// hasAsteriskMap and referencedColumnsMap needs to be recalculated after restoring
// from a checkpoint.
// hasAsteriskMap needs to be recalculated after restoring from a checkpoint.
cacheTransformRuleInfo(restoredCreateTableEvent);
// Since PostTransformOperator doesn't preserve state, pre-transformed schema
@ -268,9 +263,27 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
Schema originalSchema =
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema();
Optional<SchemaChangeEvent> schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
hasAsteriskMap.get(tableId), referencedColumnsMap.get(tableId), event);
Optional<SchemaChangeEvent> schemaChangeEvent;
if (hasAsteriskMap.getOrDefault(tableId, true)) {
// If this TableId is asterisk-ful, we should use the latest upstream schema as
// referenced columns to perform schema evolution, not of the original ones generated
// when creating tables. If hasAsteriskMap has no entry for this TableId, it means that
// this TableId has not been referenced by any transform rules, and should be regarded
// as asterisk-ful by default.
schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
true, tableChangeInfo.getSourceSchema().getColumnNames(), event);
} else {
// Otherwise, we will use the pre-transformed columns to determine if the given schema
// change event should be passed to downstream, only when it is presented in the
// pre-transformed schema.
schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
false,
tableChangeInfo.getPreTransformedSchema().getColumnNames(),
event);
}
if (schemaChangeEvent.isPresent()) {
preTransformedSchema =
SchemaUtils.applySchemaChangeEvent(
@ -283,26 +296,8 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
TableId tableId = createTableEvent.tableId();
Set<String> referencedColumnsSet =
transforms.stream()
.filter(t -> t.getSelectors().isMatch(tableId))
.flatMap(
rule ->
TransformParser.generateReferencedColumns(
rule.getProjection()
.map(TransformProjection::getProjection)
.orElse(null),
rule.getFilter()
.map(TransformFilter::getExpression)
.orElse(null),
createTableEvent.getSchema().getColumns())
.stream())
.map(Column::getName)
.collect(Collectors.toSet());
boolean notTransformed =
transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
if (notTransformed) {
// If this TableId isn't presented in any transform block, it should behave like a "*"
// projection and should be regarded as asterisk-ful.
@ -320,11 +315,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
}
referencedColumnsMap.put(
createTableEvent.tableId(),
createTableEvent.getSchema().getColumnNames().stream()
.filter(referencedColumnsSet::contains)
.collect(Collectors.toList()));
}
private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {

Loading…
Cancel
Save