[FLINK-34653][cdc][route] Support table merging with route rules

This closes  #3129.
pull/3235/head
Qingsheng Ren 10 months ago committed by GitHub
parent 9d150c07a3
commit 6017b16528
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -15,107 +15,20 @@
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.route;
package org.apache.flink.cdc.common.utils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.utils.Preconditions.checkState;
/** A map function that applies user-defined routing logics. */
public class RouteFunction extends RichMapFunction<Event, Event> {
private final List<Tuple2<String, TableId>> routingRules;
private transient List<Tuple2<Selectors, TableId>> routes;
public static Builder newBuilder() {
return new Builder();
}
/** Builder of {@link RouteFunction}. */
public static class Builder {
private final List<Tuple2<String, TableId>> routingRules = new ArrayList<>();
public Builder addRoute(String tableInclusions, TableId replaceBy) {
routingRules.add(Tuple2.of(tableInclusions, replaceBy));
return this;
}
public RouteFunction build() {
return new RouteFunction(routingRules);
}
}
private RouteFunction(List<Tuple2<String, TableId>> routingRules) {
this.routingRules = routingRules;
}
@Override
public void open(Configuration parameters) throws Exception {
routes =
routingRules.stream()
.map(
tuple2 -> {
String tableInclusions = tuple2.f0;
TableId replaceBy = tuple2.f1;
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables(tableInclusions)
.build();
return new Tuple2<>(selectors, replaceBy);
})
.collect(Collectors.toList());
}
@Override
public Event map(Event event) throws Exception {
checkState(
event instanceof ChangeEvent,
String.format(
"The input event of the route is not a ChangeEvent but with type \"%s\"",
event.getClass().getCanonicalName()));
ChangeEvent changeEvent = (ChangeEvent) event;
TableId tableId = changeEvent.tableId();
for (Tuple2<Selectors, TableId> route : routes) {
Selectors selectors = route.f0;
TableId replaceBy = route.f1;
if (selectors.isMatch(tableId)) {
return recreateChangeEvent(changeEvent, replaceBy);
}
}
return event;
}
private ChangeEvent recreateChangeEvent(ChangeEvent event, TableId tableId) {
if (event instanceof DataChangeEvent) {
return recreateDataChangeEvent(((DataChangeEvent) event), tableId);
}
if (event instanceof SchemaChangeEvent) {
return recreateSchemaChangeEvent(((SchemaChangeEvent) event), tableId);
}
throw new UnsupportedOperationException(
String.format(
"Unsupported change event with type \"%s\"",
event.getClass().getCanonicalName()));
}
private DataChangeEvent recreateDataChangeEvent(
/** Utilities for handling {@link org.apache.flink.cdc.common.event.ChangeEvent}s. */
public class ChangeEventUtils {
public static DataChangeEvent recreateDataChangeEvent(
DataChangeEvent dataChangeEvent, TableId tableId) {
switch (dataChangeEvent.op()) {
case INSERT:
@ -141,7 +54,7 @@ public class RouteFunction extends RichMapFunction<Event, Event> {
}
}
private SchemaChangeEvent recreateSchemaChangeEvent(
public static SchemaChangeEvent recreateSchemaChangeEvent(
SchemaChangeEvent schemaChangeEvent, TableId tableId) {
if (schemaChangeEvent instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;

@ -40,7 +40,8 @@ public class ChangeEventAssert<SELF extends AbstractAssert<SELF, EVENT>, EVENT e
failWithActualExpectedAndMessage(
actual.tableId(),
tableId,
"Table ID of the DataChangeEvent is not as expected");
"Table ID of the %s is not as expected",
actual.getClass().getSimpleName());
}
return myself;
}

@ -32,7 +32,6 @@ import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator;
import org.apache.flink.cdc.composer.flink.translator.PartitioningTranslator;
import org.apache.flink.cdc.composer.flink.translator.RouteTranslator;
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
import org.apache.flink.cdc.composer.flink.translator.TransformTranslator;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
@ -104,6 +103,8 @@ public class FlinkPipelineComposer implements PipelineComposer {
// Build TransformSchemaOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
stream = transformTranslator.translateSchema(stream, pipelineDef.getTransforms());
// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
pipelineDef
@ -121,16 +122,12 @@ public class FlinkPipelineComposer implements PipelineComposer {
schemaOperatorIDGenerator.generate(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
// Build Router used to route Event
RouteTranslator routeTranslator = new RouteTranslator();
stream = routeTranslator.translate(stream, pipelineDef.getRoute());
// Build DataSink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());
stream =
schemaOperatorTranslator.translate(
stream, parallelism, dataSink.getMetadataApplier());
stream, parallelism, dataSink.getMetadataApplier(), pipelineDef.getRoute());
// Build Partitioner used to shuffle Event
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.composer.flink.translator;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.runtime.operators.route.RouteFunction;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.util.List;
/** Translator used to build {@link RouteFunction}. */
public class RouteTranslator {
public DataStream<Event> translate(DataStream<Event> input, List<RouteDef> routes) {
if (routes.isEmpty()) {
return input;
}
RouteFunction.Builder routeFunctionBuilder = RouteFunction.newBuilder();
for (RouteDef route : routes) {
routeFunctionBuilder.addRoute(
route.getSourceTable(), TableId.parse(route.getSinkTable()));
}
return input.map(routeFunctionBuilder.build(), new EventTypeInfo()).name("Route");
}
}

@ -17,17 +17,23 @@
package org.apache.flink.cdc.composer.flink.translator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperatorFactory;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.util.ArrayList;
import java.util.List;
/** Translator used to build {@link SchemaOperator} for schema event process. */
@Internal
public class SchemaOperatorTranslator {
@ -41,10 +47,13 @@ public class SchemaOperatorTranslator {
}
public DataStream<Event> translate(
DataStream<Event> input, int parallelism, MetadataApplier metadataApplier) {
DataStream<Event> input,
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
switch (schemaChangeBehavior) {
case EVOLVE:
return addSchemaOperator(input, parallelism, metadataApplier);
return addSchemaOperator(input, parallelism, metadataApplier, routes);
case IGNORE:
return dropSchemaChangeEvent(input, parallelism);
case EXCEPTION:
@ -61,12 +70,20 @@ public class SchemaOperatorTranslator {
}
private DataStream<Event> addSchemaOperator(
DataStream<Event> input, int parallelism, MetadataApplier metadataApplier) {
DataStream<Event> input,
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
List<Tuple2<String, TableId>> routingRules = new ArrayList<>();
for (RouteDef route : routes) {
routingRules.add(
Tuple2.of(route.getSourceTable(), TableId.parse(route.getSinkTable())));
}
SingleOutputStreamOperator<Event> stream =
input.transform(
"SchemaOperator",
new EventTypeInfo(),
new SchemaOperatorFactory(metadataApplier));
new SchemaOperatorFactory(metadataApplier, routingRules));
stream.uid(schemaOperatorUid).setParallelism(parallelism);
return stream;
}

@ -18,9 +18,21 @@
package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.definition.TransformDef;
@ -30,14 +42,17 @@ import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
@ -386,4 +401,260 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=()}");
}
@Test
void testOneToOneRouting() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup route
TableId routedTable1 = TableId.tableId("default_namespace", "default_schema", "routed1");
TableId routedTable2 = TableId.tableId("default_namespace", "default_schema", "routed2");
List<RouteDef> routeDef =
Arrays.asList(
new RouteDef(TABLE_1.toString(), routedTable1.toString(), null),
new RouteDef(TABLE_2.toString(), routedTable2.toString(), null));
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef, sinkDef, routeDef, Collections.emptyList(), pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
// Check result in ValuesDatabase
List<String> routed1Results = ValuesDatabase.getResults(routedTable1);
assertThat(routed1Results)
.contains(
"default_namespace.default_schema.routed1:col1=2;newCol3=x",
"default_namespace.default_schema.routed1:col1=3;newCol3=");
List<String> routed2Results = ValuesDatabase.getResults(routedTable2);
assertThat(routed2Results)
.contains(
"default_namespace.default_schema.routed2:col1=1;col2=1",
"default_namespace.default_schema.routed2:col1=2;col2=2",
"default_namespace.default_schema.routed2:col1=3;col2=3");
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.routed1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.routed2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.routed1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed2, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed2, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed2, before=[], after=[3, 3], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=default_namespace.default_schema.routed1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.routed1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}
@Test
void testMergingWithRoute() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2");
Schema table1Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.primaryKey("id")
.build();
Schema table2Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT())
.physicalColumn("name", DataTypes.VARCHAR(255))
.physicalColumn("age", DataTypes.TINYINT())
.physicalColumn("description", DataTypes.STRING())
.primaryKey("id")
.build();
// Create test dataset:
// Create table 1 [id, name, age]
// Table 1: +I[1, Alice, 18]
// Table 1: +I[2, Bob, 20]
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
// Create table 2 [id, name, age]
// Table 2: +I[3, Charlie, 15, student]
// Table 2: +I[4, Donald, 25, student]
// Table 2: -D[4, Donald, 25, student]
// Rename column for table 1: name -> last_name
// Add column for table 2: gender
// Table 1: +I[5, Eliza, 24]
// Table 2: +I[6, Frank, 30, student, male]
List<Event> events = new ArrayList<>();
BinaryRecordDataGenerator table1dataGenerator =
new BinaryRecordDataGenerator(
table1Schema.getColumnDataTypes().toArray(new DataType[0]));
BinaryRecordDataGenerator table2dataGenerator =
new BinaryRecordDataGenerator(
table2Schema.getColumnDataTypes().toArray(new DataType[0]));
events.add(new CreateTableEvent(myTable1, table1Schema));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {1, BinaryStringData.fromString("Alice"), 18})));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20})));
events.add(
DataChangeEvent.updateEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20}),
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 30})));
events.add(new CreateTableEvent(myTable2, table2Schema));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
3L,
BinaryStringData.fromString("Charlie"),
(byte) 15,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Donald"),
(byte) 25,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.deleteEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Donald"),
(byte) 25,
BinaryStringData.fromString("student")
})));
events.add(new RenameColumnEvent(myTable1, ImmutableMap.of("name", "last_name")));
events.add(
new AddColumnEvent(
myTable2,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("gender", DataTypes.STRING())))));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {5, BinaryStringData.fromString("Eliza"), 24})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
new BinaryRecordDataGenerator(
new DataType[] {
DataTypes.BIGINT(),
DataTypes.VARCHAR(255),
DataTypes.TINYINT(),
DataTypes.STRING(),
DataTypes.STRING()
})
.generate(
new Object[] {
6L,
BinaryStringData.fromString("Frank"),
(byte) 30,
BinaryStringData.fromString("student"),
BinaryStringData.fromString("male")
})));
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup route
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");
List<RouteDef> routeDef =
Collections.singletonList(
new RouteDef(
"default_namespace.default_schema.mytable[0-9]",
mergedTable.toString(),
null));
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef, sinkDef, routeDef, Collections.emptyList(), pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable);
assertThat(mergedTableSchema)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.BIGINT())
.physicalColumn("description", DataTypes.STRING())
.physicalColumn("last_name", DataTypes.STRING())
.physicalColumn("gender", DataTypes.STRING())
.primaryKey("id")
.build());
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, existedColumnName=null}]}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, nameMapping={age=BIGINT, id=BIGINT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, student], after=[], op=DELETE, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`last_name` STRING, position=LAST, existedColumnName=null}]}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, null, 24, null, Eliza, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}");
}
}

@ -17,17 +17,30 @@
package org.apache.flink.cdc.runtime.operators.schema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@ -40,10 +53,21 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* The operator will evolve schemas in {@link SchemaRegistry} for incoming {@link
@ -56,10 +80,17 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
private final List<Tuple2<String, TableId>> routingRules;
private transient List<Tuple2<Selectors, TableId>> routes;
private transient TaskOperatorEventGateway toCoordinator;
private transient SchemaEvolutionClient schemaEvolutionClient;
private transient LoadingCache<TableId, Schema> cachedSchemas;
public SchemaOperator() {
public SchemaOperator(List<Tuple2<String, TableId>> routingRules) {
this.routingRules = routingRules;
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@ -70,6 +101,30 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
Output<StreamRecord<Event>> output) {
super.setup(containingTask, config, output);
this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway();
routes =
routingRules.stream()
.map(
tuple2 -> {
String tableInclusions = tuple2.f0;
TableId replaceBy = tuple2.f1;
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables(tableInclusions)
.build();
return new Tuple2<>(selectors, replaceBy);
})
.collect(Collectors.toList());
schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, getOperatorID());
cachedSchemas =
CacheBuilder.newBuilder()
.expireAfterAccess(CACHE_EXPIRE_DURATION)
.build(
new CacheLoader<TableId, Schema>() {
@Override
public Schema load(TableId tableId) {
return getLatestSchema(tableId);
}
});
}
/**
@ -78,29 +133,130 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
@Override
public void processElement(StreamRecord<Event> streamRecord) {
Event event = streamRecord.getValue();
// Schema changes
if (event instanceof SchemaChangeEvent) {
TableId tableId = ((SchemaChangeEvent) event).tableId();
LOG.info(
"Table {} received SchemaChangeEvent and start to be blocked.",
tableId.toString());
handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);
// Update caches
cachedSchemas.put(tableId, getLatestSchema(tableId));
getRoutedTable(tableId)
.ifPresent(routed -> cachedSchemas.put(routed, getLatestSchema(routed)));
return;
}
output.collect(streamRecord);
// Data changes
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
TableId tableId = dataChangeEvent.tableId();
Optional<TableId> optionalRoutedTable = getRoutedTable(tableId);
if (optionalRoutedTable.isPresent()) {
output.collect(
new StreamRecord<>(
maybeFillInNullForEmptyColumns(
dataChangeEvent, optionalRoutedTable.get())));
} else {
output.collect(streamRecord);
}
}
// ----------------------------------------------------------------------------------
private DataChangeEvent maybeFillInNullForEmptyColumns(
DataChangeEvent originalEvent, TableId routedTableId) {
try {
Schema originalSchema = cachedSchemas.get(originalEvent.tableId());
Schema routedTableSchema = cachedSchemas.get(routedTableId);
if (originalSchema.equals(routedTableSchema)) {
return ChangeEventUtils.recreateDataChangeEvent(originalEvent, routedTableId);
}
switch (originalEvent.op()) {
case INSERT:
return DataChangeEvent.insertEvent(
routedTableId,
regenerateRecordData(
originalEvent.after(), originalSchema, routedTableSchema),
originalEvent.meta());
case UPDATE:
return DataChangeEvent.updateEvent(
routedTableId,
regenerateRecordData(
originalEvent.before(), originalSchema, routedTableSchema),
regenerateRecordData(
originalEvent.after(), originalSchema, routedTableSchema),
originalEvent.meta());
case DELETE:
return DataChangeEvent.deleteEvent(
routedTableId,
regenerateRecordData(
originalEvent.before(), originalSchema, routedTableSchema),
originalEvent.meta());
case REPLACE:
return DataChangeEvent.replaceEvent(
routedTableId,
regenerateRecordData(
originalEvent.after(), originalSchema, routedTableSchema),
originalEvent.meta());
default:
throw new IllegalArgumentException(
String.format(
"Unrecognized operation type \"%s\"", originalEvent.op()));
}
} catch (Exception e) {
throw new IllegalStateException("Unable to fill null for empty columns", e);
}
}
private RecordData regenerateRecordData(
RecordData recordData, Schema originalSchema, Schema routedTableSchema) {
// Regenerate record data
List<RecordData.FieldGetter> fieldGetters = new ArrayList<>();
for (Column column : routedTableSchema.getColumns()) {
String columnName = column.getName();
int columnIndex = originalSchema.getColumnNames().indexOf(columnName);
if (columnIndex == -1) {
fieldGetters.add(new NullFieldGetter());
} else {
RecordData.FieldGetter fieldGetter =
RecordData.createFieldGetter(
originalSchema.getColumn(columnName).get().getType(), columnIndex);
// Check type compatibility
if (originalSchema.getColumn(columnName).get().getType().equals(column.getType())) {
fieldGetters.add(fieldGetter);
} else {
fieldGetters.add(new TypeCoercionFieldGetter(column.getType(), fieldGetter));
}
}
}
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(
routedTableSchema.getColumnDataTypes().toArray(new DataType[0]));
return recordDataGenerator.generate(
fieldGetters.stream()
.map(fieldGetter -> fieldGetter.getFieldOrNull(recordData))
.toArray());
}
private Optional<TableId> getRoutedTable(TableId originalTableId) {
for (Tuple2<Selectors, TableId> route : routes) {
if (route.f0.isMatch(originalTableId)) {
return Optional.of(route.f1);
}
}
return Optional.empty();
}
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
// The request will need to send a FlushEvent or block until flushing finished
SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);
if (response.isShouldSendFlushEvent()) {
if (!response.getSchemaChangeEvents().isEmpty()) {
LOG.info(
"Sending the FlushEvent for table {} in subtask {}.",
tableId,
getRuntimeContext().getIndexOfThisSubtask());
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
output.collect(new StreamRecord<>(schemaChangeEvent));
response.getSchemaChangeEvents().forEach(e -> output.collect(new StreamRecord<>(e)));
// The request will block until flushing finished in each sink writer
requestReleaseUpstream();
}
@ -127,4 +283,90 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
"Failed to send request to coordinator: " + request.toString(), e);
}
}
private Schema getLatestSchema(TableId tableId) {
try {
Optional<Schema> optionalSchema = schemaEvolutionClient.getLatestSchema(tableId);
if (!optionalSchema.isPresent()) {
throw new IllegalStateException(
String.format("Schema doesn't exist for table \"%s\"", tableId));
}
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table \"%s\"", tableId));
}
}
private static class NullFieldGetter implements RecordData.FieldGetter {
@Nullable
@Override
public Object getFieldOrNull(RecordData recordData) {
return null;
}
}
private static class TypeCoercionFieldGetter implements RecordData.FieldGetter {
private final DataType destinationType;
private final RecordData.FieldGetter originalFieldGetter;
public TypeCoercionFieldGetter(
DataType destinationType, RecordData.FieldGetter originalFieldGetter) {
this.destinationType = destinationType;
this.originalFieldGetter = originalFieldGetter;
}
@Nullable
@Override
public Object getFieldOrNull(RecordData recordData) {
Object originalField = originalFieldGetter.getFieldOrNull(recordData);
if (originalField == null) {
return null;
}
if (destinationType.is(DataTypeRoot.BIGINT)) {
if (originalField instanceof Byte) {
// TINYINT
return ((Byte) originalField).longValue();
} else if (originalField instanceof Short) {
// SMALLINT
return ((Short) originalField).longValue();
} else if (originalField instanceof Integer) {
// INT
return ((Integer) originalField).longValue();
} else {
throw new IllegalArgumentException(
String.format(
"Cannot fit type \"%s\" into a BIGINT column. "
+ "Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column",
originalField.getClass()));
}
} else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
if (originalField instanceof Float) {
// FLOAT
return ((Float) originalField).doubleValue();
} else {
throw new IllegalArgumentException(
String.format(
"Cannot fit type \"%s\" into a DOUBLE column. "
+ "Currently only FLOAT can be accepted by a DOUBLE column",
originalField.getClass()));
}
} else if (destinationType.is(DataTypeRoot.VARCHAR)) {
if (originalField instanceof StringData) {
return originalField;
} else {
throw new IllegalArgumentException(
String.format(
"Cannot fit type \"%s\" into a STRING column. "
+ "Currently only CHAR / VARCHAR can be accepted by a STRING column",
originalField.getClass()));
}
} else {
throw new IllegalArgumentException(
String.format(
"Column type \"%s\" doesn't support type coercion",
destinationType));
}
}
}
}

@ -17,8 +17,10 @@
package org.apache.flink.cdc.runtime.operators.schema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
@ -27,6 +29,8 @@ import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import java.util.List;
/** Factory to create {@link SchemaOperator}. */
@Internal
public class SchemaOperatorFactory extends SimpleOperatorFactory<Event>
@ -35,15 +39,18 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory<Event>
private static final long serialVersionUID = 1L;
private final MetadataApplier metadataApplier;
private final List<Tuple2<String, TableId>> routingRules;
public SchemaOperatorFactory(MetadataApplier metadataApplier) {
super(new SchemaOperator());
public SchemaOperatorFactory(
MetadataApplier metadataApplier, List<Tuple2<String, TableId>> routingRules) {
super(new SchemaOperator(routingRules));
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
}
@Override
public OperatorCoordinator.Provider getCoordinatorProvider(
String operatorName, OperatorID operatorID) {
return new SchemaRegistryProvider(operatorID, operatorName, metadataApplier);
return new SchemaRegistryProvider(operatorID, operatorName, metadataApplier, routingRules);
}
}

@ -0,0 +1,315 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/** Derive schema changes based on the routing rules. */
public class SchemaDerivation {
private final SchemaManager schemaManager;
private final List<Tuple2<Selectors, TableId>> routes;
private final Map<TableId, Set<TableId>> derivationMapping;
public SchemaDerivation(
SchemaManager schemaManager,
List<Tuple2<Selectors, TableId>> routes,
Map<TableId, Set<TableId>> derivationMapping) {
this.schemaManager = schemaManager;
this.routes = routes;
this.derivationMapping = derivationMapping;
}
public List<SchemaChangeEvent> applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
for (Tuple2<Selectors, TableId> route : routes) {
TableId originalTable = schemaChangeEvent.tableId();
// Check routing table
if (!route.f0.isMatch(originalTable)) {
continue;
}
// Matched a routing rule
TableId derivedTable = route.f1;
Set<TableId> originalTables =
derivationMapping.computeIfAbsent(derivedTable, t -> new HashSet<>());
originalTables.add(originalTable);
if (originalTables.size() == 1) {
// 1-to-1 mapping. Replace the table ID directly
SchemaChangeEvent derivedSchemaChangeEvent =
ChangeEventUtils.recreateSchemaChangeEvent(schemaChangeEvent, derivedTable);
schemaManager.applySchemaChange(derivedSchemaChangeEvent);
return Collections.singletonList(derivedSchemaChangeEvent);
}
// Many-to-1 mapping (merging tables)
Schema derivedTableSchema = schemaManager.getLatestSchema(derivedTable).get();
if (schemaChangeEvent instanceof CreateTableEvent) {
return handleCreateTableEvent(
(CreateTableEvent) schemaChangeEvent, derivedTableSchema, derivedTable);
} else if (schemaChangeEvent instanceof AddColumnEvent) {
return handleAddColumnEvent(
(AddColumnEvent) schemaChangeEvent, derivedTableSchema, derivedTable);
} else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
return handleAlterColumnTypeEvent(
(AlterColumnTypeEvent) schemaChangeEvent, derivedTableSchema, derivedTable);
} else if (schemaChangeEvent instanceof DropColumnEvent) {
return Collections.emptyList();
} else if (schemaChangeEvent instanceof RenameColumnEvent) {
return handleRenameColumnEvent(
(RenameColumnEvent) schemaChangeEvent, derivedTableSchema, derivedTable);
} else {
throw new IllegalStateException(
String.format(
"Unrecognized SchemaChangeEvent type: %s", schemaChangeEvent));
}
}
// No routes are matched
return Collections.singletonList(schemaChangeEvent);
}
public Map<TableId, Set<TableId>> getDerivationMapping() {
return derivationMapping;
}
public static void serializeDerivationMapping(
SchemaDerivation schemaDerivation, DataOutputStream out) throws IOException {
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
// Serialize derivation mapping in SchemaDerivation
Map<TableId, Set<TableId>> derivationMapping = schemaDerivation.getDerivationMapping();
out.write(derivationMapping.size());
for (Map.Entry<TableId, Set<TableId>> entry : derivationMapping.entrySet()) {
// Routed table ID
TableId routedTableId = entry.getKey();
tableIdSerializer.serialize(routedTableId, new DataOutputViewStreamWrapper(out));
// Original table IDs
Set<TableId> originalTableIds = entry.getValue();
out.writeInt(originalTableIds.size());
for (TableId originalTableId : originalTableIds) {
tableIdSerializer.serialize(originalTableId, new DataOutputViewStreamWrapper(out));
}
}
}
public static Map<TableId, Set<TableId>> deserializerDerivationMapping(DataInputStream in)
throws IOException {
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
int derivationMappingSize = in.readInt();
Map<TableId, Set<TableId>> derivationMapping = new HashMap<>(derivationMappingSize);
for (int i = 0; i < derivationMappingSize; i++) {
// Routed table ID
TableId routedTableId =
tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
// Original table IDs
int numOriginalTables = in.readInt();
Set<TableId> originalTableIds = new HashSet<>(numOriginalTables);
for (int j = 0; j < numOriginalTables; j++) {
TableId originalTableId =
tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
originalTableIds.add(originalTableId);
}
derivationMapping.put(routedTableId, originalTableIds);
}
return derivationMapping;
}
private List<SchemaChangeEvent> handleRenameColumnEvent(
RenameColumnEvent renameColumnEvent, Schema derivedTableSchema, TableId derivedTable) {
List<AddColumnEvent.ColumnWithPosition> newColumns = new ArrayList<>();
renameColumnEvent
.getNameMapping()
.forEach(
(before, after) -> {
if (derivedTableSchema.getColumn(after).isPresent()) {
return;
}
Column existedColumn = derivedTableSchema.getColumn(before).get();
newColumns.add(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
after,
existedColumn.getType(),
existedColumn.getComment())));
});
List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>();
if (!newColumns.isEmpty()) {
AddColumnEvent derivedSchemaChangeEvent = new AddColumnEvent(derivedTable, newColumns);
schemaChangeEvents.add(derivedSchemaChangeEvent);
}
schemaChangeEvents.forEach(schemaManager::applySchemaChange);
return schemaChangeEvents;
}
private List<SchemaChangeEvent> handleAlterColumnTypeEvent(
AlterColumnTypeEvent alterColumnTypeEvent,
Schema derivedTableSchema,
TableId derivedTable) {
Map<String, DataType> typeDifference = new HashMap<>();
alterColumnTypeEvent
.getTypeMapping()
.forEach(
(columnName, dataType) -> {
Column existedColumnInDerivedTable =
derivedTableSchema.getColumn(columnName).get();
if (!existedColumnInDerivedTable.getType().equals(dataType)) {
// Check type compatibility
DataType widerType =
getWiderType(
existedColumnInDerivedTable.getType(), dataType);
if (!widerType.equals(existedColumnInDerivedTable.getType())) {
typeDifference.put(
existedColumnInDerivedTable.getName(), widerType);
}
}
});
List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>();
if (!typeDifference.isEmpty()) {
AlterColumnTypeEvent derivedSchemaChangeEvent =
new AlterColumnTypeEvent(derivedTable, typeDifference);
schemaChangeEvents.add(derivedSchemaChangeEvent);
}
schemaChangeEvents.forEach(schemaManager::applySchemaChange);
return schemaChangeEvents;
}
private List<SchemaChangeEvent> handleAddColumnEvent(
AddColumnEvent addColumnEvent, Schema derivedTableSchema, TableId derivedTable) {
List<AddColumnEvent.ColumnWithPosition> newColumns = new ArrayList<>();
Map<String, DataType> newTypeMapping = new HashMap<>();
// Check if new column already existed in the derived table
for (AddColumnEvent.ColumnWithPosition addedColumn : addColumnEvent.getAddedColumns()) {
Optional<Column> optionalColumnInDerivedTable =
derivedTableSchema.getColumn(addedColumn.getAddColumn().getName());
if (!optionalColumnInDerivedTable.isPresent()) {
// Non-existed column. Use AddColumn
newColumns.add(new AddColumnEvent.ColumnWithPosition(addedColumn.getAddColumn()));
} else {
// Existed column. Check type compatibility
Column existedColumnInDerivedTable = optionalColumnInDerivedTable.get();
if (!existedColumnInDerivedTable
.getType()
.equals(addedColumn.getAddColumn().getType())) {
DataType widerType =
getWiderType(
existedColumnInDerivedTable.getType(),
addedColumn.getAddColumn().getType());
if (!widerType.equals(existedColumnInDerivedTable.getType())) {
newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType);
}
}
}
}
List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>();
if (!newColumns.isEmpty()) {
schemaChangeEvents.add(new AddColumnEvent(derivedTable, newColumns));
}
if (!newTypeMapping.isEmpty()) {
schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping));
}
schemaChangeEvents.forEach(schemaManager::applySchemaChange);
return schemaChangeEvents;
}
private List<SchemaChangeEvent> handleCreateTableEvent(
CreateTableEvent createTableEvent, Schema derivedTableSchema, TableId derivedTable) {
List<AddColumnEvent.ColumnWithPosition> newColumns = new ArrayList<>();
Map<String, DataType> newTypeMapping = new HashMap<>();
// Check if there is any columns that doesn't exist in the derived table
// and perform add-column for non-existed columns.
for (Column column : createTableEvent.getSchema().getColumns()) {
Optional<Column> optionalColumnInDerivedTable =
derivedTableSchema.getColumn(column.getName());
if (!optionalColumnInDerivedTable.isPresent()) {
// Non-existed column. Use AddColumn
newColumns.add(new AddColumnEvent.ColumnWithPosition(column));
} else {
// Existed column. Check type compatibility
Column existedColumnInDerivedTable = optionalColumnInDerivedTable.get();
if (!existedColumnInDerivedTable.getType().equals(column.getType())) {
DataType widerType =
getWiderType(existedColumnInDerivedTable.getType(), column.getType());
if (!widerType.equals(existedColumnInDerivedTable.getType())) {
newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType);
}
}
}
}
List<SchemaChangeEvent> schemaChangeEvents = new ArrayList<>();
if (!newColumns.isEmpty()) {
schemaChangeEvents.add(new AddColumnEvent(derivedTable, newColumns));
}
if (!newTypeMapping.isEmpty()) {
schemaChangeEvents.add(new AlterColumnTypeEvent(derivedTable, newTypeMapping));
}
schemaChangeEvents.forEach(schemaManager::applySchemaChange);
return schemaChangeEvents;
}
private DataType getWiderType(DataType thisType, DataType thatType) {
if (thisType.equals(thatType)) {
return thisType;
}
if (thisType.is(DataTypeFamily.INTEGER_NUMERIC)
&& thatType.is(DataTypeFamily.INTEGER_NUMERIC)) {
return DataTypes.BIGINT();
}
if (thisType.is(DataTypeFamily.CHARACTER_STRING)
&& thatType.is(DataTypeFamily.CHARACTER_STRING)) {
return DataTypes.STRING();
}
if (thisType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& thatType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
return DataTypes.DOUBLE();
}
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"", thisType, thatType));
}
}

@ -17,7 +17,9 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
@ -43,7 +45,9 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
@ -82,22 +86,30 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
/** Metadata applier for applying schema changes to external system. */
private final MetadataApplier metadataApplier;
private final List<Tuple2<Selectors, TableId>> routes;
/** The request handler that handle all requests and events. */
private SchemaRegistryRequestHandler requestHandler;
/** Schema manager for tracking schemas of all tables. */
private SchemaManager schemaManager = new SchemaManager();
private SchemaDerivation schemaDerivation;
public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
MetadataApplier metadataApplier) {
MetadataApplier metadataApplier,
List<Tuple2<Selectors, TableId>> routes) {
this.context = context;
this.operatorName = operatorName;
this.failedReasons = new HashMap<>();
this.metadataApplier = metadataApplier;
this.routes = routes;
schemaManager = new SchemaManager();
requestHandler = new SchemaRegistryRequestHandler(metadataApplier, schemaManager);
schemaDerivation = new SchemaDerivation(schemaManager, routes, new HashMap<>());
requestHandler =
new SchemaRegistryRequestHandler(metadataApplier, schemaManager, schemaDerivation);
}
@Override
@ -141,6 +153,8 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(schemaManager);
out.writeInt(serializedSchemaManager.length);
out.write(serializedSchemaManager);
// Serialize SchemaDerivation mapping
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
resultFuture.complete(baos.toByteArray());
}
}
@ -181,7 +195,12 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
schemaManager =
SchemaManager.SERIALIZER.deserialize(
schemaManagerSerializerVersion, serializedSchemaManager);
requestHandler = new SchemaRegistryRequestHandler(metadataApplier, schemaManager);
Map<TableId, Set<TableId>> derivationMapping =
SchemaDerivation.deserializerDerivationMapping(in);
schemaDerivation = new SchemaDerivation(schemaManager, routes, derivationMapping);
requestHandler =
new SchemaRegistryRequestHandler(
metadataApplier, schemaManager, schemaDerivation);
}
}

@ -17,11 +17,17 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import java.util.List;
import java.util.stream.Collectors;
/** Provider of {@link SchemaRegistry}. */
@Internal
public class SchemaRegistryProvider implements OperatorCoordinator.Provider {
@ -30,12 +36,17 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider {
private final OperatorID operatorID;
private final String operatorName;
private final MetadataApplier metadataApplier;
private final List<Tuple2<String, TableId>> routingRules;
public SchemaRegistryProvider(
OperatorID operatorID, String operatorName, MetadataApplier metadataApplier) {
OperatorID operatorID,
String operatorName,
MetadataApplier metadataApplier,
List<Tuple2<String, TableId>> routingRules) {
this.operatorID = operatorID;
this.operatorName = operatorName;
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
}
@Override
@ -45,6 +56,19 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider {
@Override
public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
return new SchemaRegistry(operatorName, context, metadataApplier);
List<Tuple2<Selectors, TableId>> routes =
routingRules.stream()
.map(
tuple2 -> {
String tableInclusions = tuple2.f0;
TableId replaceBy = tuple2.f1;
Selectors selectors =
new Selectors.SelectorsBuilder()
.includeTables(tableInclusions)
.build();
return new Tuple2<>(selectors, replaceBy);
})
.collect(Collectors.toList());
return new SchemaRegistry(operatorName, context, metadataApplier, routes);
}
}

@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@ -55,6 +56,8 @@ public class SchemaRegistryRequestHandler {
/** Schema manager holding schema for all tables. */
private final SchemaManager schemaManager;
private final SchemaDerivation schemaDerivation;
/**
* Not applied SchemaChangeRequest before receiving all flush success events for its table from
* sink writers.
@ -64,12 +67,15 @@ public class SchemaRegistryRequestHandler {
private final Set<Integer> flushedSinkWriters;
public SchemaRegistryRequestHandler(
MetadataApplier metadataApplier, SchemaManager schemaManager) {
MetadataApplier metadataApplier,
SchemaManager schemaManager,
SchemaDerivation schemaDerivation) {
this.metadataApplier = metadataApplier;
this.activeSinkWriters = new HashSet<>();
this.flushedSinkWriters = new HashSet<>();
this.pendingSchemaChanges = new LinkedList<>();
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
}
/**
@ -96,13 +102,22 @@ public class SchemaRegistryRequestHandler {
request.getTableId().toString());
if (request.getSchemaChangeEvent() instanceof CreateTableEvent
&& schemaManager.schemaExists(request.getTableId())) {
return CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(false)));
return CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(Collections.emptyList())));
}
CompletableFuture<CoordinationResponse> response =
CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true)));
schemaManager.applySchemaChange(request.getSchemaChangeEvent());
pendingSchemaChanges.add(new PendingSchemaChange(request, response));
pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
List<SchemaChangeEvent> derivedSchemaChangeEvents =
schemaDerivation.applySchemaChange(request.getSchemaChangeEvent());
CompletableFuture<CoordinationResponse> response =
CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
if (!derivedSchemaChangeEvents.isEmpty()) {
PendingSchemaChange pendingSchemaChange =
new PendingSchemaChange(request, response);
pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents;
pendingSchemaChanges.add(pendingSchemaChange);
pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
}
return response;
} else {
LOG.info("There are already processing requests. Wait for processing.");
@ -147,7 +162,8 @@ public class SchemaRegistryRequestHandler {
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());
waitFlushSuccess.derivedSchemaChangeEvents.forEach(
schemaChangeEvent -> applySchemaChange(tableId, schemaChangeEvent));
waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));
if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
@ -166,21 +182,27 @@ public class SchemaRegistryRequestHandler {
&& schemaManager.schemaExists(request.getTableId())) {
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(false)));
.complete(wrap(new SchemaChangeResponse(Collections.emptyList())));
pendingSchemaChanges.remove(0);
} else {
schemaManager.applySchemaChange(request.getSchemaChangeEvent());
List<SchemaChangeEvent> derivedSchemaChangeEvents =
schemaDerivation.applySchemaChange(request.getSchemaChangeEvent());
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(true)));
pendingSchemaChange.startToWaitForReleaseRequest();
break;
.complete(wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
if (!derivedSchemaChangeEvents.isEmpty()) {
pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents;
pendingSchemaChange.startToWaitForReleaseRequest();
break;
}
}
}
}
private static class PendingSchemaChange {
private final SchemaChangeRequest changeRequest;
private List<SchemaChangeEvent> derivedSchemaChangeEvents;
private CompletableFuture<CoordinationResponse> responseFuture;
private RequestStatus status;

@ -17,10 +17,12 @@
package org.apache.flink.cdc.runtime.operators.schema.event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import java.util.List;
import java.util.Objects;
/**
@ -34,14 +36,14 @@ public class SchemaChangeResponse implements CoordinationResponse {
* Whether the SchemaOperator need to buffer data and the SchemaOperatorCoordinator need to wait
* for flushing.
*/
private final boolean shouldSendFlushEvent;
private final List<SchemaChangeEvent> schemaChangeEvents;
public SchemaChangeResponse(boolean shouldSendFlushEvent) {
this.shouldSendFlushEvent = shouldSendFlushEvent;
public SchemaChangeResponse(List<SchemaChangeEvent> schemaChangeEvents) {
this.schemaChangeEvents = schemaChangeEvents;
}
public boolean isShouldSendFlushEvent() {
return shouldSendFlushEvent;
public List<SchemaChangeEvent> getSchemaChangeEvents() {
return schemaChangeEvents;
}
@Override
@ -53,11 +55,11 @@ public class SchemaChangeResponse implements CoordinationResponse {
return false;
}
SchemaChangeResponse response = (SchemaChangeResponse) o;
return shouldSendFlushEvent == response.shouldSendFlushEvent;
return schemaChangeEvents.equals(response.schemaChangeEvents);
}
@Override
public int hashCode() {
return Objects.hash(shouldSendFlushEvent);
return Objects.hash(schemaChangeEvents);
}
}

@ -1,195 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.route;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat;
class RouteFunctionTest {
private static final TableId CUSTOMERS =
TableId.tableId("my_company", "my_branch", "customers");
private static final TableId NEW_CUSTOMERS =
TableId.tableId("my_new_company", "my_new_branch", "customers");
private static final Schema CUSTOMERS_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("phone", DataTypes.BIGINT())
.primaryKey("id")
.build();
@Test
void testDataChangeEventRouting() throws Exception {
RouteFunction router =
RouteFunction.newBuilder()
.addRoute("my_company.\\.+.customers", NEW_CUSTOMERS)
.build();
router.open(new Configuration());
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(((RowType) CUSTOMERS_SCHEMA.toRowDataType()));
// Insert
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
CUSTOMERS,
recordDataGenerator.generate(
new Object[] {1, new BinaryStringData("Alice"), 12345678L}));
assertThat(router.map(insertEvent))
.asDataChangeEvent()
.hasTableId(NEW_CUSTOMERS)
.hasOperationType(OperationType.INSERT)
.withAfterRecordData()
.hasArity(3)
.withSchema(CUSTOMERS_SCHEMA)
.hasFields(1, new BinaryStringData("Alice"), 12345678L);
// Update
DataChangeEvent updateEvent =
DataChangeEvent.updateEvent(
CUSTOMERS,
recordDataGenerator.generate(
new Object[] {1, new BinaryStringData("Alice"), 12345678L}),
recordDataGenerator.generate(
new Object[] {1, new BinaryStringData("Alice"), 87654321L}));
DataChangeEvent mappedUpdateEvent = (DataChangeEvent) router.map(updateEvent);
assertThat(mappedUpdateEvent)
.hasTableId(NEW_CUSTOMERS)
.hasOperationType(OperationType.UPDATE);
assertThat(mappedUpdateEvent.before())
.withSchema(CUSTOMERS_SCHEMA)
.hasFields(1, new BinaryStringData("Alice"), 12345678L);
assertThat(mappedUpdateEvent.after())
.withSchema(CUSTOMERS_SCHEMA)
.hasFields(1, new BinaryStringData("Alice"), 87654321L);
// Replace
DataChangeEvent replaceEvent =
DataChangeEvent.replaceEvent(
CUSTOMERS,
recordDataGenerator.generate(
new Object[] {1, new BinaryStringData("Bob"), 87654321L}));
assertThat(router.map(replaceEvent))
.asDataChangeEvent()
.hasTableId(NEW_CUSTOMERS)
.hasOperationType(OperationType.REPLACE)
.withAfterRecordData()
.hasArity(3)
.withSchema(CUSTOMERS_SCHEMA)
.hasFields(1, new BinaryStringData("Bob"), 87654321L);
// Delete
DataChangeEvent deleteEvent =
DataChangeEvent.deleteEvent(
CUSTOMERS,
recordDataGenerator.generate(
new Object[] {1, new BinaryStringData("Bob"), 87654321L}));
assertThat(router.map(deleteEvent))
.asDataChangeEvent()
.hasTableId(NEW_CUSTOMERS)
.hasOperationType(OperationType.DELETE)
.withBeforeRecordData()
.hasArity(3)
.withSchema(CUSTOMERS_SCHEMA)
.hasFields(1, new BinaryStringData("Bob"), 87654321L);
}
@Test
void testSchemaChangeEventRouting() throws Exception {
RouteFunction router =
RouteFunction.newBuilder()
.addRoute("\\.+_company.\\.+_branch.customers", NEW_CUSTOMERS)
.build();
router.open(new Configuration());
// CreateTableEvent
CreateTableEvent createTableEvent = new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA);
assertThat(router.map(createTableEvent))
.asSchemaChangeEvent()
.hasTableId(NEW_CUSTOMERS)
.asCreateTableEvent()
.hasSchema(CUSTOMERS_SCHEMA);
// AddColumnEvent
AddColumnEvent.ColumnWithPosition newColumn =
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("address", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.LAST,
null);
AddColumnEvent addColumnEvent =
new AddColumnEvent(CUSTOMERS, Collections.singletonList(newColumn));
assertThat(router.map(addColumnEvent))
.asSchemaChangeEvent()
.asAddColumnEvent()
.hasTableId(NEW_CUSTOMERS)
.containsAddedColumns(newColumn);
// DropColumnEvent
PhysicalColumn droppedColumn = Column.physicalColumn("address", DataTypes.STRING());
List<String> droppedColumns = Collections.singletonList(droppedColumn.getName());
DropColumnEvent dropColumnEvent = new DropColumnEvent(CUSTOMERS, droppedColumns);
assertThat(router.map(dropColumnEvent))
.asSchemaChangeEvent()
.asDropColumnEvent()
.containsDroppedColumns(droppedColumn.getName())
.hasTableId(NEW_CUSTOMERS);
// RenameColumnEvent
Map<String, String> columnRenaming = ImmutableMap.of("phone", "mobile");
RenameColumnEvent renameColumnEvent = new RenameColumnEvent(CUSTOMERS, columnRenaming);
assertThat(router.map(renameColumnEvent))
.asSchemaChangeEvent()
.asRenameColumnEvent()
.containsNameMapping(columnRenaming)
.hasTableId(NEW_CUSTOMERS);
// AlterColumnTypeEvent
Map<String, DataType> typeMapping = ImmutableMap.of("mobile", DataTypes.STRING());
AlterColumnTypeEvent alterColumnTypeEvent =
new AlterColumnTypeEvent(CUSTOMERS, typeMapping);
assertThat(router.map(alterColumnTypeEvent))
.asSchemaChangeEvent()
.asAlterColumnTypeEvent()
.containsTypeMapping(typeMapping)
.hasTableId(NEW_CUSTOMERS);
}
}

@ -97,7 +97,7 @@ public class SchemaOperatorTest {
int maxParallelism, int parallelism, int subtaskIndex, OperatorID opID)
throws Exception {
return new OneInputStreamOperatorTestHarness<>(
new SchemaOperator(),
new SchemaOperator(new ArrayList<>()),
maxParallelism,
parallelism,
subtaskIndex,

@ -0,0 +1,365 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Unit test for {@link SchemaDerivation}. */
class SchemaDerivationTest {
private static final TableId TABLE_1 = TableId.tableId("mydb", "myschema", "mytable1");
private static final TableId TABLE_2 = TableId.tableId("mydb", "myschema", "mytable2");
private static final TableId MERGED_TABLE = TableId.tableId("mydb", "myschema", "mytables");
private static final Schema SCHEMA =
Schema.newBuilder()
.column(Column.physicalColumn("id", DataTypes.BIGINT()))
.column(Column.physicalColumn("name", DataTypes.STRING()))
.column(Column.physicalColumn("age", DataTypes.INT()))
.build();
private static final Schema COMPATIBLE_SCHEMA =
Schema.newBuilder()
.column(Column.physicalColumn("id", DataTypes.BIGINT()))
.column(Column.physicalColumn("name", DataTypes.STRING()))
.column(Column.physicalColumn("age", DataTypes.BIGINT()))
.column(Column.physicalColumn("gender", DataTypes.STRING()))
.build();
private static final Schema INCOMPATIBLE_SCHEMA =
Schema.newBuilder()
.column(Column.physicalColumn("id", DataTypes.BIGINT()))
.column(Column.physicalColumn("name", DataTypes.STRING()))
.column(Column.physicalColumn("age", DataTypes.STRING()))
.column(Column.physicalColumn("gender", DataTypes.STRING()))
.build();
private static final List<Tuple2<Selectors, TableId>> ROUTES =
Collections.singletonList(
Tuple2.of(
new Selectors.SelectorsBuilder()
.includeTables("mydb.myschema.mytable[0-9]")
.build(),
MERGED_TABLE));
@Test
void testOneToOneMapping() {
SchemaDerivation schemaDerivation =
new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>());
// Create table
List<SchemaChangeEvent> derivedChangesAfterCreateTable =
schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA));
assertThat(derivedChangesAfterCreateTable).hasSize(1);
assertThat(derivedChangesAfterCreateTable.get(0))
.asCreateTableEvent()
.hasTableId(MERGED_TABLE)
.hasSchema(SCHEMA);
// Add column
AddColumnEvent.ColumnWithPosition newCol1 =
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("new_col1", DataTypes.STRING(), null));
AddColumnEvent.ColumnWithPosition newCol2 =
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("new_col2", DataTypes.STRING(), null));
List<AddColumnEvent.ColumnWithPosition> newColumns = Arrays.asList(newCol1, newCol2);
List<SchemaChangeEvent> derivedChangesAfterAddColumn =
schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_1, newColumns));
assertThat(derivedChangesAfterAddColumn).hasSize(1);
assertThat(derivedChangesAfterAddColumn.get(0))
.asAddColumnEvent()
.hasTableId(MERGED_TABLE)
.containsAddedColumns(newCol1, newCol2);
// Alter column type
ImmutableMap<String, DataType> typeMapping = ImmutableMap.of("age", DataTypes.BIGINT());
List<SchemaChangeEvent> derivedChangesAfterAlterTableType =
schemaDerivation.applySchemaChange(new AlterColumnTypeEvent(TABLE_1, typeMapping));
assertThat(derivedChangesAfterAlterTableType).hasSize(1);
assertThat(derivedChangesAfterAlterTableType.get(0))
.asAlterColumnTypeEvent()
.hasTableId(MERGED_TABLE)
.containsTypeMapping(typeMapping);
// Drop column
List<String> droppedColumns = Arrays.asList("new_col1", "new_col2");
List<SchemaChangeEvent> derivedChangesAfterDropColumn =
schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_1, droppedColumns));
assertThat(derivedChangesAfterDropColumn).hasSize(1);
assertThat(derivedChangesAfterDropColumn.get(0))
.asDropColumnEvent()
.hasTableId(MERGED_TABLE)
.containsDroppedColumns("new_col1", "new_col2");
// Rename column
Map<String, String> renamedColumns = ImmutableMap.of("name", "last_name");
List<SchemaChangeEvent> derivedChangesAfterRenameColumn =
schemaDerivation.applySchemaChange(new RenameColumnEvent(TABLE_1, renamedColumns));
assertThat(derivedChangesAfterRenameColumn).hasSize(1);
assertThat(derivedChangesAfterRenameColumn.get(0))
.asRenameColumnEvent()
.hasTableId(MERGED_TABLE)
.containsNameMapping(renamedColumns);
}
@Test
void testMergingTablesWithExactSameSchema() {
SchemaDerivation schemaDerivation =
new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>());
// Create table 1
List<SchemaChangeEvent> derivedChangesAfterCreateTable =
schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA));
assertThat(derivedChangesAfterCreateTable).hasSize(1);
assertThat(derivedChangesAfterCreateTable.get(0))
.asCreateTableEvent()
.hasTableId(MERGED_TABLE)
.hasSchema(SCHEMA);
// Create table 2
assertThat(schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_2, SCHEMA)))
.isEmpty();
// Add column for table 1
AddColumnEvent.ColumnWithPosition newCol1 =
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("new_col1", DataTypes.STRING(), null));
AddColumnEvent.ColumnWithPosition newCol2 =
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("new_col2", DataTypes.STRING(), null));
List<AddColumnEvent.ColumnWithPosition> newColumns = Arrays.asList(newCol1, newCol2);
List<SchemaChangeEvent> derivedChangesAfterAddColumn =
schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_1, newColumns));
assertThat(derivedChangesAfterAddColumn).hasSize(1);
assertThat(derivedChangesAfterAddColumn.get(0))
.asAddColumnEvent()
.hasTableId(MERGED_TABLE)
.containsAddedColumns(newCol1, newCol2);
// Add column for table 2
assertThat(schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_2, newColumns)))
.isEmpty();
// Alter column type for table 1
ImmutableMap<String, DataType> typeMapping = ImmutableMap.of("age", DataTypes.BIGINT());
List<SchemaChangeEvent> derivedChangesAfterAlterColumnType =
schemaDerivation.applySchemaChange(new AlterColumnTypeEvent(TABLE_1, typeMapping));
assertThat(derivedChangesAfterAlterColumnType).hasSize(1);
assertThat(derivedChangesAfterAlterColumnType.get(0))
.asAlterColumnTypeEvent()
.hasTableId(MERGED_TABLE)
.containsTypeMapping(typeMapping);
// Alter column type for table 2
assertThat(
schemaDerivation.applySchemaChange(
new AlterColumnTypeEvent(TABLE_2, typeMapping)))
.isEmpty();
// Drop column for table 1
List<String> droppedColumns = Arrays.asList("new_col1", "new_col2");
assertThat(schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_1, droppedColumns)))
.isEmpty();
// Drop column for table 2
assertThat(schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_2, droppedColumns)))
.isEmpty();
// Rename column for table 1
Map<String, String> renamedColumns = ImmutableMap.of("name", "last_name");
List<SchemaChangeEvent> derivedChangesAfterRenameColumn =
schemaDerivation.applySchemaChange(new RenameColumnEvent(TABLE_1, renamedColumns));
assertThat(derivedChangesAfterRenameColumn).hasSize(1);
assertThat(derivedChangesAfterRenameColumn.get(0))
.asAddColumnEvent()
.hasTableId(MERGED_TABLE)
.containsAddedColumns(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("last_name", DataTypes.STRING(), null)));
// Rename column for table 2
assertThat(
schemaDerivation.applySchemaChange(
new RenameColumnEvent(TABLE_2, renamedColumns)))
.isEmpty();
}
@Test
void testMergingTableWithDifferentSchemas() {
SchemaManager schemaManager = new SchemaManager();
SchemaDerivation schemaDerivation =
new SchemaDerivation(schemaManager, ROUTES, new HashMap<>());
// Create table 1
List<SchemaChangeEvent> derivedChangesAfterCreateTable =
schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA));
assertThat(derivedChangesAfterCreateTable).hasSize(1);
assertThat(derivedChangesAfterCreateTable.get(0))
.asCreateTableEvent()
.hasTableId(MERGED_TABLE)
.hasSchema(SCHEMA);
// Create table 2
List<SchemaChangeEvent> derivedChangesAfterCreateTable2 =
schemaDerivation.applySchemaChange(
new CreateTableEvent(TABLE_2, COMPATIBLE_SCHEMA));
assertThat(derivedChangesAfterCreateTable2).hasSize(2);
assertThat(derivedChangesAfterCreateTable2)
.containsExactlyInAnyOrder(
new AddColumnEvent(
MERGED_TABLE,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
"gender", DataTypes.STRING(), null)))),
new AlterColumnTypeEvent(
MERGED_TABLE, ImmutableMap.of("age", DataTypes.BIGINT())));
// Add column for table 1
AddColumnEvent.ColumnWithPosition newCol1 =
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("new_col1", DataTypes.VARCHAR(255), null));
AddColumnEvent.ColumnWithPosition newCol2 =
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("new_col2", DataTypes.VARCHAR(255), null));
List<AddColumnEvent.ColumnWithPosition> newColumns = Arrays.asList(newCol1, newCol2);
List<SchemaChangeEvent> derivedChangesAfterAddColumn =
schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_1, newColumns));
assertThat(derivedChangesAfterAddColumn).hasSize(1);
assertThat(derivedChangesAfterAddColumn.get(0))
.asAddColumnEvent()
.hasTableId(MERGED_TABLE)
.containsAddedColumns(newCol1, newCol2);
// Add column for table 2
List<SchemaChangeEvent> derivedChangesAfterAddColumnForTable2 =
schemaDerivation.applySchemaChange(
new AddColumnEvent(
TABLE_2,
Arrays.asList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
"new_col1", DataTypes.STRING(), null)),
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn(
"new_col2", DataTypes.STRING(), null)))));
assertThat(derivedChangesAfterAddColumnForTable2).hasSize(1);
assertThat(derivedChangesAfterAddColumnForTable2.get(0))
.asAlterColumnTypeEvent()
.containsTypeMapping(
ImmutableMap.of(
"new_col1", DataTypes.STRING(), "new_col2", DataTypes.STRING()));
// Alter column type for table 1
ImmutableMap<String, DataType> typeMapping = ImmutableMap.of("age", DataTypes.BIGINT());
List<SchemaChangeEvent> derivedChangesAfterAlterColumnType =
schemaDerivation.applySchemaChange(new AlterColumnTypeEvent(TABLE_1, typeMapping));
assertThat(derivedChangesAfterAlterColumnType).isEmpty();
// Alter column type for table 2
List<SchemaChangeEvent> derivedChangesAfterAlterColumnTypeForTable2 =
schemaDerivation.applySchemaChange(
new AlterColumnTypeEvent(
TABLE_2, ImmutableMap.of("age", DataTypes.TINYINT())));
assertThat(derivedChangesAfterAlterColumnTypeForTable2).isEmpty();
// Drop column for table 1
List<String> droppedColumns = Arrays.asList("new_col1", "new_col2");
assertThat(schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_1, droppedColumns)))
.isEmpty();
// Drop column for table 2
assertThat(schemaDerivation.applySchemaChange(new DropColumnEvent(TABLE_2, droppedColumns)))
.isEmpty();
// Rename column for table 1
Map<String, String> renamedColumns = ImmutableMap.of("name", "last_name");
List<SchemaChangeEvent> derivedChangesAfterRenameColumn =
schemaDerivation.applySchemaChange(new RenameColumnEvent(TABLE_1, renamedColumns));
assertThat(derivedChangesAfterRenameColumn).hasSize(1);
assertThat(derivedChangesAfterRenameColumn.get(0))
.asAddColumnEvent()
.hasTableId(MERGED_TABLE)
.containsAddedColumns(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("last_name", DataTypes.STRING(), null)));
// Rename column for table 2
List<SchemaChangeEvent> derivedChangesAfterRenameColumnForTable2 =
schemaDerivation.applySchemaChange(
new RenameColumnEvent(TABLE_2, ImmutableMap.of("name", "first_name")));
assertThat(derivedChangesAfterRenameColumnForTable2).hasSize(1);
assertThat(derivedChangesAfterRenameColumnForTable2.get(0))
.asAddColumnEvent()
.hasTableId(MERGED_TABLE)
.containsAddedColumns(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("first_name", DataTypes.STRING(), null)));
assertThat(schemaManager.getLatestSchema(MERGED_TABLE))
.contains(
Schema.newBuilder()
.column(Column.physicalColumn("id", DataTypes.BIGINT()))
.column(Column.physicalColumn("name", DataTypes.STRING()))
.column(Column.physicalColumn("age", DataTypes.BIGINT()))
.column(Column.physicalColumn("gender", DataTypes.STRING()))
.column(Column.physicalColumn("new_col1", DataTypes.STRING()))
.column(Column.physicalColumn("new_col2", DataTypes.STRING()))
.column(Column.physicalColumn("last_name", DataTypes.STRING()))
.column(Column.physicalColumn("first_name", DataTypes.STRING()))
.build());
}
@Test
void testIncompatibleTypes() {
SchemaDerivation schemaDerivation =
new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>());
// Create table 1
List<SchemaChangeEvent> derivedChangesAfterCreateTable =
schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA));
assertThat(derivedChangesAfterCreateTable).hasSize(1);
assertThat(derivedChangesAfterCreateTable.get(0))
.asCreateTableEvent()
.hasTableId(MERGED_TABLE)
.hasSchema(SCHEMA);
// Create table 2
assertThatThrownBy(
() ->
schemaDerivation.applySchemaChange(
new CreateTableEvent(TABLE_2, INCOMPATIBLE_SCHEMA)))
.isInstanceOf(IllegalStateException.class)
.hasMessage("Incompatible types: \"INT\" and \"STRING\"");
}
}

@ -41,6 +41,7 @@ import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.util.OutputTag;
import java.util.Collections;
import java.util.LinkedList;
/**
@ -72,7 +73,8 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
new CollectingMetadataApplier());
new CollectingMetadataApplier(),
Collections.emptyList());
schemaRegistryGateway = new TestingSchemaRegistryGateway(schemaRegistry);
}

Loading…
Cancel
Save