diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index c48f94402..6d77bcfcd 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -54,7 +54,6 @@ import org.apache.flink.test.junit5.MiniClusterExtension; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -495,8 +494,9 @@ class FlinkPipelineComposerITCase { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}"); } - @Test - void testOneToOneRouting() throws Exception { + @ParameterizedTest + @EnumSource + void testOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); // Setup value source @@ -510,6 +510,7 @@ class FlinkPipelineComposerITCase { // Setup value sink Configuration sinkConfig = new Configuration(); sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); // Setup route @@ -570,8 +571,9 @@ class FlinkPipelineComposerITCase { "DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); } - @Test - void testIdenticalOneToOneRouting() throws Exception { + @ParameterizedTest + @EnumSource + void testIdenticalOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); // Setup value source @@ -585,6 +587,7 @@ class FlinkPipelineComposerITCase { // Setup value sink Configuration sinkConfig = new Configuration(); sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); // Setup route @@ -645,8 +648,9 @@ class FlinkPipelineComposerITCase { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); } - @Test - void testMergingWithRoute() throws Exception { + @ParameterizedTest + @EnumSource + void testMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); // Setup value source @@ -678,7 +682,7 @@ class FlinkPipelineComposerITCase { // Table 1: +I[1, Alice, 18] // Table 1: +I[2, Bob, 20] // Table 1: -U[2, Bob, 20] +U[2, Bob, 30] - // Create table 2 [id, name, age] + // Create table 2 [id, name, age, description] // Table 2: +I[3, Charlie, 15, student] // Table 2: +I[4, Donald, 25, student] // Table 2: -D[4, Donald, 25, student] @@ -782,6 +786,7 @@ class FlinkPipelineComposerITCase { // Setup value sink Configuration sinkConfig = new Configuration(); sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); // Setup route @@ -841,8 +846,9 @@ class FlinkPipelineComposerITCase { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}"); } - @Test - void testTransformMergingWithRoute() throws Exception { + @ParameterizedTest + @EnumSource + void testTransformMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); // Setup value source @@ -978,6 +984,7 @@ class FlinkPipelineComposerITCase { // Setup value sink Configuration sinkConfig = new Configuration(); sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); // Setup transform @@ -1049,6 +1056,216 @@ class FlinkPipelineComposerITCase { "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}"); } + @ParameterizedTest + @EnumSource + void testTransformMergingWithRouteChangeOrder(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); + TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2"); + Schema table1Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + Schema table2Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .physicalColumn("age", DataTypes.TINYINT()) + .physicalColumn("description", DataTypes.STRING()) + .primaryKey("id") + .build(); + + // Create test dataset: + // Create table 1 [id, name, age] + // Create table 2 [id, name, age, description] + // Table 1: +I[1, Alice, 18] + // Table 1: +I[2, Bob, 20] + // Table 1: -U[2, Bob, 20] +U[2, Bob, 30] + // Table 2: +I[3, Charlie, 15, student] + // Table 2: +I[4, Donald, 25, student] + // Table 2: -D[4, Donald, 25, student] + // Rename column for table 1: name -> last_name + // Add column for table 2: gender + // Table 1: +I[5, Eliza, 24] + // Table 2: +I[6, Frank, 30, student, male] + List events = new ArrayList<>(); + BinaryRecordDataGenerator table1dataGenerator = + new BinaryRecordDataGenerator( + table1Schema.getColumnDataTypes().toArray(new DataType[0])); + BinaryRecordDataGenerator table2dataGenerator = + new BinaryRecordDataGenerator( + table2Schema.getColumnDataTypes().toArray(new DataType[0])); + events.add(new CreateTableEvent(myTable1, table1Schema)); + events.add(new CreateTableEvent(myTable2, table2Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {1, BinaryStringData.fromString("Alice"), 18}))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 20}), + table1dataGenerator.generate( + new Object[] {2, BinaryStringData.fromString("Bob"), 30}))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 3L, + BinaryStringData.fromString("Charlie"), + (byte) 15, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + events.add( + DataChangeEvent.deleteEvent( + myTable2, + table2dataGenerator.generate( + new Object[] { + 4L, + BinaryStringData.fromString("Donald"), + (byte) 25, + BinaryStringData.fromString("student") + }))); + // events.add(new RenameColumnEvent(myTable1, ImmutableMap.of("name", "last_name"))); + events.add( + new AddColumnEvent( + myTable2, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("gender", DataTypes.STRING()))))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + table1dataGenerator.generate( + new Object[] {5, BinaryStringData.fromString("Eliza"), 24}))); + events.add( + DataChangeEvent.insertEvent( + myTable2, + new BinaryRecordDataGenerator( + new DataType[] { + DataTypes.BIGINT(), + DataTypes.VARCHAR(255), + DataTypes.TINYINT(), + DataTypes.STRING(), + DataTypes.STRING() + }) + .generate( + new Object[] { + 6L, + BinaryStringData.fromString("Frank"), + (byte) 30, + BinaryStringData.fromString("student"), + BinaryStringData.fromString("male") + }))); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + List transformDef = + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.mytable[0-9]", + "*,'last_name' as last_name", + null, + null, + null, + null, + "", + null)); + + // Setup route + TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged"); + List routeDef = + Collections.singletonList( + new RouteDef( + "default_namespace.default_schema.mytable[0-9]", + mergedTable.toString(), + null, + null)); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + routeDef, + transformDef, + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable); + assertThat(mergedTableSchema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("last_name", DataTypes.STRING()) + .physicalColumn("description", DataTypes.STRING()) + .physicalColumn("gender", DataTypes.STRING()) + .primaryKey("id") + .build()); + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name, null], after=[2, Bob, 30, last_name, null], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, Donald, 25, last_name, student], after=[], op=DELETE, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`gender` STRING, position=AFTER, existedColumnName=description}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[5, Eliza, 24, last_name, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}"); + } + @ParameterizedTest @EnumSource void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index c565b5610..c7c69fa6c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -81,7 +81,7 @@ public class PostTransformOperator extends AbstractStreamOperator private List udfDescriptors; private transient Map udfFunctionInstances; - private transient Map, TransformProjectionProcessor> + private transient Map, TransformProjectionProcessor> transformProjectionProcessorMap; private transient Map, TransformFilterProcessor> transformFilterProcessorMap; @@ -355,12 +355,14 @@ public class PostTransformOperator extends AbstractStreamOperator for (PostTransformer transform : transforms) { Selectors selectors = transform.getSelectors(); if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { - TransformProjection transformProjection = transform.getProjection().get(); + TransformProjection transformProjection = + TransformProjection.of(transform.getProjection().get().getProjection()) + .get(); if (transformProjection.isValid()) { if (!transformProjectionProcessorMap.containsKey( - Tuple2.of(tableId, transformProjection))) { + Tuple2.of(tableId, transformProjection.getProjection()))) { transformProjectionProcessorMap.put( - Tuple2.of(tableId, transformProjection), + Tuple2.of(tableId, transformProjection.getProjection()), TransformProjectionProcessor.of( transformProjection, timezone, @@ -370,7 +372,7 @@ public class PostTransformOperator extends AbstractStreamOperator } TransformProjectionProcessor postTransformProcessor = transformProjectionProcessorMap.get( - Tuple2.of(tableId, transformProjection)); + Tuple2.of(tableId, transformProjection.getProjection())); // update the columns of projection and add the column of projection into Schema newSchemas.add( postTransformProcessor.processSchema( @@ -434,12 +436,9 @@ public class PostTransformOperator extends AbstractStreamOperator && transformProjectionOptional.get().isValid()) { TransformProjection transformProjection = transformProjectionOptional.get(); if (!transformProjectionProcessorMap.containsKey( - Tuple2.of(tableId, transformProjection)) - || !transformProjectionProcessorMap - .get(Tuple2.of(tableId, transformProjection)) - .hasTableInfo()) { + Tuple2.of(tableId, transformProjection.getProjection()))) { transformProjectionProcessorMap.put( - Tuple2.of(tableId, transformProjection), + Tuple2.of(tableId, transformProjection.getProjection()), TransformProjectionProcessor.of( tableInfo, transformProjection, @@ -447,10 +446,26 @@ public class PostTransformOperator extends AbstractStreamOperator udfDescriptors, getUdfFunctionInstances(), transform.getSupportedMetadataColumns())); + } else if (!transformProjectionProcessorMap + .get(Tuple2.of(tableId, transformProjection.getProjection())) + .hasTableInfo()) { + TransformProjectionProcessor transformProjectionProcessorWithoutTableInfo = + transformProjectionProcessorMap.get( + Tuple2.of(tableId, transformProjection.getProjection())); + transformProjectionProcessorMap.put( + Tuple2.of(tableId, transformProjection.getProjection()), + TransformProjectionProcessor.of( + tableInfo, + transformProjectionProcessorWithoutTableInfo + .getTransformProjection(), + timezone, + udfDescriptors, + getUdfFunctionInstances(), + transform.getSupportedMetadataColumns())); } TransformProjectionProcessor postTransformProcessor = transformProjectionProcessorMap.get( - Tuple2.of(tableId, transformProjection)); + Tuple2.of(tableId, transformProjection.getProjection())); dataChangeEventOptional = processProjection( postTransformProcessor, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjection.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjection.java index 94abba117..3d15e78aa 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjection.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjection.java @@ -17,14 +17,12 @@ package org.apache.flink.cdc.runtime.operators.transform; -import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.utils.StringUtils; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; /** * The projection of transform applies to describe a projection of filtering tables. Projection @@ -42,6 +40,11 @@ public class TransformProjection implements Serializable { private String projection; private List projectionColumns; + public TransformProjection(String projection) { + this.projection = projection; + this.projectionColumns = new ArrayList<>(); + } + public TransformProjection(String projection, List projectionColumns) { this.projection = projection; this.projectionColumns = projectionColumns; @@ -69,10 +72,4 @@ public class TransformProjection implements Serializable { } return Optional.of(new TransformProjection(projection, new ArrayList<>())); } - - public List getAllColumnList() { - return projectionColumns.stream() - .map(ProjectionColumn::getColumn) - .collect(Collectors.toList()); - } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java index dc8c099d6..8ac1cbe56 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java @@ -81,6 +81,10 @@ public class TransformProjectionProcessor { return this.postTransformChangeInfo != null; } + public TransformProjection getTransformProjection() { + return transformProjection; + } + public static TransformProjectionProcessor of( PostTransformChangeInfo tableInfo, TransformProjection transformProjection,