[FLINK-36981][transform] Considering sharding tables with different schema in transform projection

This closes #3826.
pull/3846/head
Wink 3 weeks ago committed by GitHub
parent 486ee6f805
commit 49dc957ac6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -54,7 +54,6 @@ import org.apache.flink.test.junit5.MiniClusterExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@ -495,8 +494,9 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}");
}
@Test
void testOneToOneRouting() throws Exception {
@ParameterizedTest
@EnumSource
void testOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
@ -510,6 +510,7 @@ class FlinkPipelineComposerITCase {
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup route
@ -570,8 +571,9 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}
@Test
void testIdenticalOneToOneRouting() throws Exception {
@ParameterizedTest
@EnumSource
void testIdenticalOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
@ -585,6 +587,7 @@ class FlinkPipelineComposerITCase {
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup route
@ -645,8 +648,9 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}
@Test
void testMergingWithRoute() throws Exception {
@ParameterizedTest
@EnumSource
void testMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
@ -678,7 +682,7 @@ class FlinkPipelineComposerITCase {
// Table 1: +I[1, Alice, 18]
// Table 1: +I[2, Bob, 20]
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
// Create table 2 [id, name, age]
// Create table 2 [id, name, age, description]
// Table 2: +I[3, Charlie, 15, student]
// Table 2: +I[4, Donald, 25, student]
// Table 2: -D[4, Donald, 25, student]
@ -782,6 +786,7 @@ class FlinkPipelineComposerITCase {
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup route
@ -841,8 +846,9 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}");
}
@Test
void testTransformMergingWithRoute() throws Exception {
@ParameterizedTest
@EnumSource
void testTransformMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
@ -978,6 +984,7 @@ class FlinkPipelineComposerITCase {
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup transform
@ -1049,6 +1056,216 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}");
}
@ParameterizedTest
@EnumSource
void testTransformMergingWithRouteChangeOrder(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2");
Schema table1Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.primaryKey("id")
.build();
Schema table2Schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT())
.physicalColumn("name", DataTypes.VARCHAR(255))
.physicalColumn("age", DataTypes.TINYINT())
.physicalColumn("description", DataTypes.STRING())
.primaryKey("id")
.build();
// Create test dataset:
// Create table 1 [id, name, age]
// Create table 2 [id, name, age, description]
// Table 1: +I[1, Alice, 18]
// Table 1: +I[2, Bob, 20]
// Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
// Table 2: +I[3, Charlie, 15, student]
// Table 2: +I[4, Donald, 25, student]
// Table 2: -D[4, Donald, 25, student]
// Rename column for table 1: name -> last_name
// Add column for table 2: gender
// Table 1: +I[5, Eliza, 24]
// Table 2: +I[6, Frank, 30, student, male]
List<Event> events = new ArrayList<>();
BinaryRecordDataGenerator table1dataGenerator =
new BinaryRecordDataGenerator(
table1Schema.getColumnDataTypes().toArray(new DataType[0]));
BinaryRecordDataGenerator table2dataGenerator =
new BinaryRecordDataGenerator(
table2Schema.getColumnDataTypes().toArray(new DataType[0]));
events.add(new CreateTableEvent(myTable1, table1Schema));
events.add(new CreateTableEvent(myTable2, table2Schema));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {1, BinaryStringData.fromString("Alice"), 18})));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20})));
events.add(
DataChangeEvent.updateEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 20}),
table1dataGenerator.generate(
new Object[] {2, BinaryStringData.fromString("Bob"), 30})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
3L,
BinaryStringData.fromString("Charlie"),
(byte) 15,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Donald"),
(byte) 25,
BinaryStringData.fromString("student")
})));
events.add(
DataChangeEvent.deleteEvent(
myTable2,
table2dataGenerator.generate(
new Object[] {
4L,
BinaryStringData.fromString("Donald"),
(byte) 25,
BinaryStringData.fromString("student")
})));
// events.add(new RenameColumnEvent(myTable1, ImmutableMap.of("name", "last_name")));
events.add(
new AddColumnEvent(
myTable2,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("gender", DataTypes.STRING())))));
events.add(
DataChangeEvent.insertEvent(
myTable1,
table1dataGenerator.generate(
new Object[] {5, BinaryStringData.fromString("Eliza"), 24})));
events.add(
DataChangeEvent.insertEvent(
myTable2,
new BinaryRecordDataGenerator(
new DataType[] {
DataTypes.BIGINT(),
DataTypes.VARCHAR(255),
DataTypes.TINYINT(),
DataTypes.STRING(),
DataTypes.STRING()
})
.generate(
new Object[] {
6L,
BinaryStringData.fromString("Frank"),
(byte) 30,
BinaryStringData.fromString("student"),
BinaryStringData.fromString("male")
})));
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup transform
List<TransformDef> transformDef =
Collections.singletonList(
new TransformDef(
"default_namespace.default_schema.mytable[0-9]",
"*,'last_name' as last_name",
null,
null,
null,
null,
"",
null));
// Setup route
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");
List<RouteDef> routeDef =
Collections.singletonList(
new RouteDef(
"default_namespace.default_schema.mytable[0-9]",
mergedTable.toString(),
null,
null));
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
routeDef,
transformDef,
Collections.emptyList(),
pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable);
assertThat(mergedTableSchema)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.physicalColumn("last_name", DataTypes.STRING())
.physicalColumn("description", DataTypes.STRING())
.physicalColumn("gender", DataTypes.STRING())
.primaryKey("id")
.build());
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name, null], after=[2, Bob, 30, last_name, null], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=AFTER, existedColumnName=description}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, Eliza, 24, last_name, null, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}");
}
@ParameterizedTest
@EnumSource
void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception {

@ -81,7 +81,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
private List<UserDefinedFunctionDescriptor> udfDescriptors;
private transient Map<String, Object> udfFunctionInstances;
private transient Map<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor>
private transient Map<Tuple2<TableId, String>, TransformProjectionProcessor>
transformProjectionProcessorMap;
private transient Map<Tuple2<TableId, TransformFilter>, TransformFilterProcessor>
transformFilterProcessorMap;
@ -355,12 +355,14 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
for (PostTransformer transform : transforms) {
Selectors selectors = transform.getSelectors();
if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) {
TransformProjection transformProjection = transform.getProjection().get();
TransformProjection transformProjection =
TransformProjection.of(transform.getProjection().get().getProjection())
.get();
if (transformProjection.isValid()) {
if (!transformProjectionProcessorMap.containsKey(
Tuple2.of(tableId, transformProjection))) {
Tuple2.of(tableId, transformProjection.getProjection()))) {
transformProjectionProcessorMap.put(
Tuple2.of(tableId, transformProjection),
Tuple2.of(tableId, transformProjection.getProjection()),
TransformProjectionProcessor.of(
transformProjection,
timezone,
@ -370,7 +372,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
}
TransformProjectionProcessor postTransformProcessor =
transformProjectionProcessorMap.get(
Tuple2.of(tableId, transformProjection));
Tuple2.of(tableId, transformProjection.getProjection()));
// update the columns of projection and add the column of projection into Schema
newSchemas.add(
postTransformProcessor.processSchema(
@ -434,12 +436,9 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
&& transformProjectionOptional.get().isValid()) {
TransformProjection transformProjection = transformProjectionOptional.get();
if (!transformProjectionProcessorMap.containsKey(
Tuple2.of(tableId, transformProjection))
|| !transformProjectionProcessorMap
.get(Tuple2.of(tableId, transformProjection))
.hasTableInfo()) {
Tuple2.of(tableId, transformProjection.getProjection()))) {
transformProjectionProcessorMap.put(
Tuple2.of(tableId, transformProjection),
Tuple2.of(tableId, transformProjection.getProjection()),
TransformProjectionProcessor.of(
tableInfo,
transformProjection,
@ -447,10 +446,26 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
udfDescriptors,
getUdfFunctionInstances(),
transform.getSupportedMetadataColumns()));
} else if (!transformProjectionProcessorMap
.get(Tuple2.of(tableId, transformProjection.getProjection()))
.hasTableInfo()) {
TransformProjectionProcessor transformProjectionProcessorWithoutTableInfo =
transformProjectionProcessorMap.get(
Tuple2.of(tableId, transformProjection.getProjection()));
transformProjectionProcessorMap.put(
Tuple2.of(tableId, transformProjection.getProjection()),
TransformProjectionProcessor.of(
tableInfo,
transformProjectionProcessorWithoutTableInfo
.getTransformProjection(),
timezone,
udfDescriptors,
getUdfFunctionInstances(),
transform.getSupportedMetadataColumns()));
}
TransformProjectionProcessor postTransformProcessor =
transformProjectionProcessorMap.get(
Tuple2.of(tableId, transformProjection));
Tuple2.of(tableId, transformProjection.getProjection()));
dataChangeEventOptional =
processProjection(
postTransformProcessor,

@ -17,14 +17,12 @@
package org.apache.flink.cdc.runtime.operators.transform;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.utils.StringUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* The projection of transform applies to describe a projection of filtering tables. Projection
@ -42,6 +40,11 @@ public class TransformProjection implements Serializable {
private String projection;
private List<ProjectionColumn> projectionColumns;
public TransformProjection(String projection) {
this.projection = projection;
this.projectionColumns = new ArrayList<>();
}
public TransformProjection(String projection, List<ProjectionColumn> projectionColumns) {
this.projection = projection;
this.projectionColumns = projectionColumns;
@ -69,10 +72,4 @@ public class TransformProjection implements Serializable {
}
return Optional.of(new TransformProjection(projection, new ArrayList<>()));
}
public List<Column> getAllColumnList() {
return projectionColumns.stream()
.map(ProjectionColumn::getColumn)
.collect(Collectors.toList());
}
}

@ -81,6 +81,10 @@ public class TransformProjectionProcessor {
return this.postTransformChangeInfo != null;
}
public TransformProjection getTransformProjection() {
return transformProjection;
}
public static TransformProjectionProcessor of(
PostTransformChangeInfo tableInfo,
TransformProjection transformProjection,

Loading…
Cancel
Save