diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index a5f0924c3..f867b7d08 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -50,9 +50,9 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量 -## 示例 +## 单数据源示例 -从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下: +单数据源,从单个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下: ```yaml source: @@ -77,6 +77,44 @@ pipeline: parallelism: 4 ``` +## 多数据源示例 + +多数据源,从多个mysql数据源读取数据同步到 Doris 的 Pipeline 可以定义如下: + +```yaml +sources: + - type: mysql + name: MySQL multiple Source1 + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5400-5404 + server-time-zone: Asia/Shanghai + + - type: mysql + name: MySQL multiple Source2 + hostname: 127.0.0.2 + port: 3307 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5405-5409 + server-time-zone: Asia/Shanghai + +sink: + type: doris + name: Doris Sink + fenodes: 127.0.0.1:8030 + username: root + password: pass + +pipeline: + name: MySQL to Doris Pipeline + parallelism: 4 +``` + ## 连接器配置项
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 3cc5c17a1..f7e904148 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -51,9 +51,9 @@ You may need to configure the following dependencies manually, and pass it with
-## Example +## single data source Example -An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows: +An example of the pipeline for reading data from single MySQL and sink to Doris can be defined as follows: ```yaml source: @@ -78,6 +78,44 @@ pipeline: parallelism: 4 ``` +## multiple data source Example + +An example of the pipeline for reading data from multiple MySQL datasource and sink to Doris can be defined as follows: + +```yaml +sources: + - type: mysql + name: MySQL multiple Source1 + hostname: 127.0.0.1 + port: 3306 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5400-5404 + server-time-zone: Asia/Shanghai + + - type: mysql + name: MySQL multiple Source2 + hostname: 127.0.0.2 + port: 3307 + username: admin + password: pass + tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.* + server-id: 5405-5409 + server-time-zone: Asia/Shanghai + +sink: + type: doris + name: Doris Sink + fenodes: 127.0.0.1:8030 + username: root + password: pass + +pipeline: + name: MySQL to Doris Pipeline + parallelism: 4 +``` + ## Connector Options
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index b594aa35b..b33d30777 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -40,6 +40,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -55,6 +56,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { // Parent node keys private static final String SOURCE_KEY = "source"; + private static final String MULTIPLE_SOURCE_KEY = "sources"; private static final String SINK_KEY = "sink"; private static final String ROUTE_KEY = "route"; private static final String TRANSFORM_KEY = "transform"; @@ -63,6 +65,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { // Source / sink keys private static final String TYPE_KEY = "type"; + private static final String SOURCES = "sources"; private static final String NAME_KEY = "name"; private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes"; private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes"; @@ -135,13 +138,20 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { SchemaChangeBehavior schemaChangeBehavior = userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR); - // Source is required - SourceDef sourceDef = - toSourceDef( - checkNotNull( - pipelineDefJsonNode.get(SOURCE_KEY), - "Missing required field \"%s\" in pipeline definition", - SOURCE_KEY)); + JsonNode multipleSourceNode = pipelineDefJsonNode.get(MULTIPLE_SOURCE_KEY); + List sourceDefs = new ArrayList<>(); + SourceDef sourceDef = null; + if (multipleSourceNode != null) { + Iterator it = multipleSourceNode.elements(); + while (it.hasNext()) { + JsonNode sourceNode = it.next(); + getSourceDefs(sourceNode, sourceDefs); + } + } else { + JsonNode sourceNode = pipelineDefJsonNode.get(SOURCE_KEY); + // Source is required + sourceDef = getSourceDefs(sourceNode, sourceDefs); + } // Sink is required SinkDef sinkDef = @@ -171,7 +181,25 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { pipelineConfig.addAll(userPipelineConfig); return new PipelineDef( - sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig); + sourceDefs, + sourceDef, + sinkDef, + routeDefs, + transformDefs, + udfDefs, + modelDefs, + pipelineConfig); + } + + private SourceDef getSourceDefs(JsonNode root, List sourceDefs) { + SourceDef sourceDef = + toSourceDef( + checkNotNull( + root, + "Missing required field \"%s\" in pipeline definition", + SOURCE_KEY)); + sourceDefs.add(sourceDef); + return sourceDef; } private SourceDef toSourceDef(JsonNode sourceNode) { diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 27cbbb1de..6364f65ff 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -38,9 +38,11 @@ import java.net.URL; import java.nio.file.Paths; import java.time.Duration; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Set; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; @@ -278,6 +280,7 @@ class YamlPipelineDefinitionParserTest { assertThat(pipelineDef) .isEqualTo( new PipelineDef( + null, new SourceDef("foo", null, new Configuration()), new SinkDef("bar", null, new Configuration(), expected), Collections.emptyList(), @@ -290,25 +293,36 @@ class YamlPipelineDefinitionParserTest { .build()))); } + @Test + void testMultipleSourceDefinition() throws Exception { + URL resource = Resources.getResource("definitions/multiple_source_mtdbak.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration()); + assertThat(pipelineDef).isInstanceOf(PipelineDef.class); + } + + SourceDef sourceDef = + new SourceDef( + "mysql", + "source-database", + Configuration.fromMap( + ImmutableMap.builder() + .put("host", "localhost") + .put("port", "3306") + .put("username", "admin") + .put("password", "pass") + .put( + "tables", + "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*") + .put("chunk-column", "app_order_.*:id,web_order:product_id") + .put("capture-new-tables", "true") + .build())); + List sourceDefs = new ArrayList<>(Arrays.asList(new SourceDef[] {sourceDef})); + private final PipelineDef fullDef = new PipelineDef( - new SourceDef( - "mysql", - "source-database", - Configuration.fromMap( - ImmutableMap.builder() - .put("host", "localhost") - .put("port", "3306") - .put("username", "admin") - .put("password", "pass") - .put( - "tables", - "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*") - .put( - "chunk-column", - "app_order_.*:id,web_order:product_id") - .put("capture-new-tables", "true") - .build())), + null, + sourceDef, new SinkDef( "kafka", "sink-queue", @@ -426,25 +440,27 @@ class YamlPipelineDefinitionParserTest { assertThat(pipelineDef).isEqualTo(fullDef); } + SourceDef fullsourceDef = + new SourceDef( + "mysql", + "source-database", + Configuration.fromMap( + ImmutableMap.builder() + .put("host", "localhost") + .put("port", "3306") + .put("username", "admin") + .put("password", "pass") + .put( + "tables", + "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*") + .put("chunk-column", "app_order_.*:id,web_order:product_id") + .put("capture-new-tables", "true") + .build())); + private final PipelineDef fullDefWithGlobalConf = new PipelineDef( - new SourceDef( - "mysql", - "source-database", - Configuration.fromMap( - ImmutableMap.builder() - .put("host", "localhost") - .put("port", "3306") - .put("username", "admin") - .put("password", "pass") - .put( - "tables", - "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*") - .put( - "chunk-column", - "app_order_.*:id,web_order:product_id") - .put("capture-new-tables", "true") - .build())), + null, + fullsourceDef, new SinkDef( "kafka", "sink-queue", @@ -504,21 +520,25 @@ class YamlPipelineDefinitionParserTest { .put("schema-operator.rpc-timeout", "1 h") .build())); + SourceDef defSourceDef = + new SourceDef( + "mysql", + null, + Configuration.fromMap( + ImmutableMap.builder() + .put("host", "localhost") + .put("port", "3306") + .put("username", "admin") + .put("password", "pass") + .put( + "tables", + "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*") + .build())); + private final PipelineDef defWithOptional = new PipelineDef( - new SourceDef( - "mysql", - null, - Configuration.fromMap( - ImmutableMap.builder() - .put("host", "localhost") - .put("port", "3306") - .put("username", "admin") - .put("password", "pass") - .put( - "tables", - "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*") - .build())), + null, + defSourceDef, new SinkDef( "kafka", null, @@ -545,9 +565,12 @@ class YamlPipelineDefinitionParserTest { .put("parallelism", "4") .build())); + SourceDef mysqlSourceDef = new SourceDef("mysql", null, new Configuration()); + private final PipelineDef minimizedDef = new PipelineDef( - new SourceDef("mysql", null, new Configuration()), + null, + mysqlSourceDef, new SinkDef( "kafka", null, @@ -565,25 +588,27 @@ class YamlPipelineDefinitionParserTest { Collections.singletonMap( "local-time-zone", ZoneId.systemDefault().toString()))); + SourceDef routeRepSymDef = + new SourceDef( + "mysql", + "source-database", + Configuration.fromMap( + ImmutableMap.builder() + .put("host", "localhost") + .put("port", "3306") + .put("username", "admin") + .put("password", "pass") + .put( + "tables", + "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*") + .put("chunk-column", "app_order_.*:id,web_order:product_id") + .put("capture-new-tables", "true") + .build())); + private final PipelineDef fullDefWithRouteRepSym = new PipelineDef( - new SourceDef( - "mysql", - "source-database", - Configuration.fromMap( - ImmutableMap.builder() - .put("host", "localhost") - .put("port", "3306") - .put("username", "admin") - .put("password", "pass") - .put( - "tables", - "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*") - .put( - "chunk-column", - "app_order_.*:id,web_order:product_id") - .put("capture-new-tables", "true") - .build())), + null, + routeRepSymDef, new SinkDef( "kafka", "sink-queue", @@ -633,6 +658,7 @@ class YamlPipelineDefinitionParserTest { private final PipelineDef pipelineDefWithUdf = new PipelineDef( + null, new SourceDef("values", null, new Configuration()), new SinkDef( "values", diff --git a/flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml b/flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml new file mode 100644 index 000000000..41546121c --- /dev/null +++ b/flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml @@ -0,0 +1,53 @@ +################################################################################ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +sources: + - type: mysql + hostname: 127.0.0.1 + port: 50001 + username: datawings + password: 123 + tables: test.table01 + server-id: 5400-5404 + server-time-zone: Asia/Shanghai + + - type: mysql + hostname: 127.0.0.2 + port: 50002 + username: datawings + password: 123 + tables: test.table02 + server-id: 5404-5408 + server-time-zone: Asia/Shanghai + + +route: + - source-table: test.table01 + sink-table: test.table01 + description: sync table to destination table1 + - source-table: test.table02 + sink-table: test.table02 + description: sync table to destination table2 + +sink: + type: doris + fenodes: 127.0.0.1:9033 + username: root + password: 123 + table.create.properties.light_schema_change: false + table.create.properties.replication_num: 1 + +pipeline: + name: Sync MySQL Database to doris + parallelism: 1 diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java index c81d45fd9..ffad8e8f1 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java @@ -23,6 +23,8 @@ import org.apache.flink.cdc.common.types.LocalZonedTimestampType; import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.PipelineExecution; +import javax.annotation.Nullable; + import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -51,7 +53,8 @@ import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCA * before being submitted to the computing engine. */ public class PipelineDef { - private final SourceDef source; + @Nullable private final List sources; + @Nullable private final SourceDef source; private final SinkDef sink; private final List routes; private final List transforms; @@ -60,6 +63,7 @@ public class PipelineDef { private final Configuration config; public PipelineDef( + List sources, SourceDef source, SinkDef sink, List routes, @@ -67,25 +71,32 @@ public class PipelineDef { List udfs, List models, Configuration config) { - this.source = source; + this.sources = sources; this.sink = sink; this.routes = routes; this.transforms = transforms; this.udfs = udfs; this.models = models; this.config = evaluatePipelineTimeZone(config); + this.source = source; } public PipelineDef( + List sources, SourceDef source, SinkDef sink, List routes, List transforms, List udfs, Configuration config) { - this(source, sink, routes, transforms, udfs, new ArrayList<>(), config); + this(sources, source, sink, routes, transforms, udfs, new ArrayList<>(), config); + } + + public List getSources() { + return sources; } + @Nullable public SourceDef getSource() { return source; } @@ -117,9 +128,11 @@ public class PipelineDef { @Override public String toString() { return "PipelineDef{" - + "source=" + + "sources=" + + sources + + ",source=" + source - + ", sink=" + + ",sink=" + sink + ", routes=" + routes @@ -143,7 +156,8 @@ public class PipelineDef { return false; } PipelineDef that = (PipelineDef) o; - return Objects.equals(source, that.source) + return Objects.equals(sources, that.sources) + && Objects.equals(source, that.source) && Objects.equals(sink, that.sink) && Objects.equals(routes, that.routes) && Objects.equals(transforms, that.transforms) @@ -154,7 +168,7 @@ public class PipelineDef { @Override public int hashCode() { - return Objects.hash(source, sink, routes, transforms, udfs, models, config); + return Objects.hash(sources, source, sink, routes, transforms, udfs, models, config); } // ------------------------------------------------------------------------ diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 533868c56..e14ca51f7 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.SourceDef; import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator; import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator; @@ -126,16 +127,41 @@ public class FlinkPipelineComposer implements PipelineComposer { // And required constructors OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); - DataSource dataSource = - sourceTranslator.createDataSource(pipelineDef.getSource(), pipelineDefConfig, env); - DataSink dataSink = - sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env); + List sourceDefs = pipelineDef.getSources(); - boolean isParallelMetadataSource = dataSource.isParallelMetadataSource(); + // O ---> Source + DataStream stream = null; + DataSource dataSource = null; + boolean isParallelMetadataSource; // O ---> Source - DataStream stream = - sourceTranslator.translate(pipelineDef.getSource(), dataSource, env, parallelism); + if (sourceDefs != null) { + for (SourceDef source : sourceDefs) { + dataSource = sourceTranslator.createDataSource(source, pipelineDefConfig, env); + DataStream streamBranch = + sourceTranslator.translate(source, dataSource, env, parallelism); + if (stream == null) { + stream = streamBranch; + } else { + stream = stream.union(streamBranch); + } + } + if (sourceDefs.size() > 1) { + isParallelMetadataSource = true; + } else { + isParallelMetadataSource = dataSource.isParallelMetadataSource(); + } + } else { + dataSource = + sourceTranslator.createDataSource( + pipelineDef.getSource(), pipelineDefConfig, env); + stream = + sourceTranslator.translate( + pipelineDef.getSource(), dataSource, env, parallelism); + isParallelMetadataSource = dataSource.isParallelMetadataSource(); + } + DataSink dataSink = + sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env); // Source ---> PreTransform stream = diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkParallelizedPipelineITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkParallelizedPipelineITCase.java index 1920227da..8d1142772 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkParallelizedPipelineITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkParallelizedPipelineITCase.java @@ -980,6 +980,7 @@ class FlinkParallelizedPipelineITCase { pipelineConfig.set(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, exception); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, traits == SourceTraits.MERGING ? ROUTING_RULES : Collections.emptyList(), diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 380baaa5d..5ade4d5af 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -151,6 +151,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -184,6 +185,90 @@ class FlinkPipelineComposerITCase { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}"); } + @ParameterizedTest + @EnumSource + void testSingleSplitMultipleSources(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig1 = new Configuration(); + sourceConfig1.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE); + Configuration sourceConfig2 = new Configuration(); + sourceConfig2.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef1 = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source1", sourceConfig1); + SourceDef sourceDef2 = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source2", sourceConfig2); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + List sourceDefs = new ArrayList<>(); + sourceDefs.add(sourceDef1); + sourceDefs.add(sourceDef2); + PipelineDef pipelineDef = + new PipelineDef( + sourceDefs, + null, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + List table1Results = ValuesDatabase.getResults(TABLE_1); + assertThat(table1Results) + .containsExactly( + "default_namespace.default_schema.table1:col1=2;newCol3=x", + "default_namespace.default_schema.table1:col1=3;newCol3="); + List table2Results = ValuesDatabase.getResults(TABLE_2); + assertThat(table2Results) + .contains( + "default_namespace.default_schema.table2:col1=1;col2=1", + "default_namespace.default_schema.table2:col1=2;col2=2", + "default_namespace.default_schema.table2:col1=3;col2=3"); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactlyInAnyOrder( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}", + "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", + "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 1], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2, 2], after=[2, x, x], op=UPDATE, meta=()}"); + } + @ParameterizedTest @EnumSource void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Exception { @@ -210,6 +295,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -277,6 +363,7 @@ class FlinkPipelineComposerITCase { pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, MAX_PARALLELISM); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -335,6 +422,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -398,6 +486,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -470,6 +559,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -527,6 +617,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, routeDef, @@ -602,6 +693,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, routeDef, @@ -801,6 +893,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, routeDef, @@ -1010,6 +1103,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, routeDef, @@ -1075,6 +1169,7 @@ class FlinkPipelineComposerITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.singletonList( @@ -1141,6 +1236,7 @@ class FlinkPipelineComposerITCase { pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/New_York"); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Arrays.asList( @@ -1251,8 +1347,11 @@ class FlinkPipelineComposerITCase { pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); pipelineConfig.set( PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + List sourceDefs = new ArrayList<>(); + sourceDefs.add(sourceDef); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.singletonList( diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java index 358094aa6..f256e9f55 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java @@ -136,9 +136,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -194,9 +194,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -253,9 +253,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -322,6 +322,7 @@ class FlinkPipelineComposerLenientITCase { pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, MAX_PARALLELISM); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -376,9 +377,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -437,9 +438,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -507,9 +508,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -562,9 +563,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, routeDef, @@ -635,9 +636,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, routeDef, @@ -832,9 +833,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, routeDef, @@ -1040,9 +1041,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, routeDef, @@ -1104,9 +1105,9 @@ class FlinkPipelineComposerLenientITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.singletonList( diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 6a775e74a..6bd428d9c 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -784,6 +784,7 @@ class FlinkPipelineTransformITCase { pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles"); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -859,6 +860,7 @@ class FlinkPipelineTransformITCase { pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -966,6 +968,7 @@ class FlinkPipelineTransformITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -1052,6 +1055,7 @@ class FlinkPipelineTransformITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -1147,6 +1151,7 @@ class FlinkPipelineTransformITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -1230,6 +1235,7 @@ class FlinkPipelineTransformITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -1325,6 +1331,7 @@ class FlinkPipelineTransformITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -1421,6 +1428,7 @@ class FlinkPipelineTransformITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -1516,6 +1524,7 @@ class FlinkPipelineTransformITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -1633,8 +1642,10 @@ class FlinkPipelineTransformITCase { // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -1700,6 +1711,7 @@ class FlinkPipelineTransformITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -1874,6 +1886,7 @@ class FlinkPipelineTransformITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index b97b4b6de..4a9708f99 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -150,6 +150,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -221,6 +222,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -290,6 +292,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -361,6 +364,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -434,6 +438,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -498,6 +503,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -594,6 +600,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -664,6 +671,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -732,6 +740,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -796,6 +805,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), @@ -875,6 +885,7 @@ public class FlinkPipelineUdfITCase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java index 4bd9806a5..24c4b4bb0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java @@ -142,6 +142,7 @@ public class MySqlParallelizedPipelineITCase extends MySqlSourceTestBase { PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); PipelineDef pipelineDef = new PipelineDef( + null, sourceDef, sinkDef, Collections.emptyList(),