From 994d17a95093b0a8dcb1d95ff705cbe2c82d4cba Mon Sep 17 00:00:00 2001
From: linjc13 <linjc13@chinatelecom.cn>
Date: Wed, 8 Jan 2025 18:55:42 +0800
Subject: [PATCH] [FLINK-36794] [cdc-composer/cli] pipeline cdc connector
 support multiple data sources

---
 .../connectors/pipeline-connectors/mysql.md   |  42 ++++-
 .../connectors/pipeline-connectors/mysql.md   |  42 ++++-
 .../parser/YamlPipelineDefinitionParser.java  |  44 ++++-
 .../YamlPipelineDefinitionParserTest.java     | 156 ++++++++++--------
 .../definitions/multiple_source_mtd.yaml      |  53 ++++++
 .../cdc/composer/definition/PipelineDef.java  |  28 +++-
 .../composer/flink/FlinkPipelineComposer.java |  40 ++++-
 .../FlinkParallelizedPipelineITCase.java      |   1 +
 .../flink/FlinkPipelineComposerITCase.java    |  99 +++++++++++
 .../FlinkPipelineComposerLenientITCase.java   |  23 +--
 .../flink/FlinkPipelineTransformITCase.java   |  13 ++
 .../flink/FlinkPipelineUdfITCase.java         |  11 ++
 .../MySqlParallelizedPipelineITCase.java      |   1 +
 13 files changed, 451 insertions(+), 102 deletions(-)
 create mode 100644 flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index a5f0924c3..f867b7d08 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -50,9 +50,9 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
 </table>
 </div>
 
-## 示例
+## 单数据源示例
 
-从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
+单数据源,从单个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
 
 ```yaml
 source:
@@ -77,6 +77,44 @@ pipeline:
    parallelism: 4
 ```
 
+## 多数据源示例
+
+多数据源,从多个mysql数据源读取数据同步到 Doris 的 Pipeline 可以定义如下:
+
+```yaml
+sources:
+   - type: mysql
+     name: MySQL multiple Source1
+     hostname: 127.0.0.1
+     port: 3306
+     username: admin
+     password: pass
+     tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+     server-id: 5400-5404
+     server-time-zone: Asia/Shanghai
+
+   - type: mysql
+       name: MySQL multiple Source2
+     hostname: 127.0.0.2
+     port: 3307
+     username: admin
+     password: pass
+     tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+     server-id: 5405-5409
+     server-time-zone: Asia/Shanghai   
+
+sink:
+  type: doris
+  name: Doris Sink
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: pass
+
+pipeline:
+   name: MySQL to Doris Pipeline
+   parallelism: 4
+```
+
 ## 连接器配置项
 
 <div class="highlight">
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 3cc5c17a1..f7e904148 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -51,9 +51,9 @@ You may need to configure the following dependencies manually, and pass it with
 </table>
 </div>
 
-## Example
+## single data source Example
 
-An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:
+An example of the pipeline for reading data from single MySQL and sink to Doris can be defined as follows:
 
 ```yaml
 source:
@@ -78,6 +78,44 @@ pipeline:
    parallelism: 4
 ```
 
+## multiple data source Example
+
+An example of the pipeline for reading data from multiple MySQL datasource and sink to Doris can be defined as follows:
+
+```yaml
+sources:
+  - type: mysql
+    name: MySQL multiple Source1
+    hostname: 127.0.0.1
+    port: 3306
+    username: admin
+    password: pass
+    tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+    server-id: 5400-5404
+    server-time-zone: Asia/Shanghai
+
+  - type: mysql
+      name: MySQL multiple Source2
+    hostname: 127.0.0.2
+    port: 3307
+    username: admin
+    password: pass
+    tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+    server-id: 5405-5409
+    server-time-zone: Asia/Shanghai
+
+sink:
+  type: doris
+  name: Doris Sink
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: pass
+
+pipeline:
+   name: MySQL to Doris Pipeline
+   parallelism: 4
+```
+
 ## Connector Options
 
 <div class="highlight">
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index b594aa35b..b33d30777 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -40,6 +40,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -55,6 +56,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
 
     // Parent node keys
     private static final String SOURCE_KEY = "source";
+    private static final String MULTIPLE_SOURCE_KEY = "sources";
     private static final String SINK_KEY = "sink";
     private static final String ROUTE_KEY = "route";
     private static final String TRANSFORM_KEY = "transform";
@@ -63,6 +65,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
 
     // Source / sink keys
     private static final String TYPE_KEY = "type";
+    private static final String SOURCES = "sources";
     private static final String NAME_KEY = "name";
     private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
     private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";
@@ -135,13 +138,20 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
         SchemaChangeBehavior schemaChangeBehavior =
                 userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
 
-        // Source is required
-        SourceDef sourceDef =
-                toSourceDef(
-                        checkNotNull(
-                                pipelineDefJsonNode.get(SOURCE_KEY),
-                                "Missing required field \"%s\" in pipeline definition",
-                                SOURCE_KEY));
+        JsonNode multipleSourceNode = pipelineDefJsonNode.get(MULTIPLE_SOURCE_KEY);
+        List<SourceDef> sourceDefs = new ArrayList<>();
+        SourceDef sourceDef = null;
+        if (multipleSourceNode != null) {
+            Iterator<JsonNode> it = multipleSourceNode.elements();
+            while (it.hasNext()) {
+                JsonNode sourceNode = it.next();
+                getSourceDefs(sourceNode, sourceDefs);
+            }
+        } else {
+            JsonNode sourceNode = pipelineDefJsonNode.get(SOURCE_KEY);
+            // Source is required
+            sourceDef = getSourceDefs(sourceNode, sourceDefs);
+        }
 
         // Sink is required
         SinkDef sinkDef =
@@ -171,7 +181,25 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
         pipelineConfig.addAll(userPipelineConfig);
 
         return new PipelineDef(
-                sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
+                sourceDefs,
+                sourceDef,
+                sinkDef,
+                routeDefs,
+                transformDefs,
+                udfDefs,
+                modelDefs,
+                pipelineConfig);
+    }
+
+    private SourceDef getSourceDefs(JsonNode root, List<SourceDef> sourceDefs) {
+        SourceDef sourceDef =
+                toSourceDef(
+                        checkNotNull(
+                                root,
+                                "Missing required field \"%s\" in pipeline definition",
+                                SOURCE_KEY));
+        sourceDefs.add(sourceDef);
+        return sourceDef;
     }
 
     private SourceDef toSourceDef(JsonNode sourceNode) {
diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index 27cbbb1de..6364f65ff 100644
--- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -38,9 +38,11 @@ import java.net.URL;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Set;
 
 import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
@@ -278,6 +280,7 @@ class YamlPipelineDefinitionParserTest {
         assertThat(pipelineDef)
                 .isEqualTo(
                         new PipelineDef(
+                                null,
                                 new SourceDef("foo", null, new Configuration()),
                                 new SinkDef("bar", null, new Configuration(), expected),
                                 Collections.emptyList(),
@@ -290,25 +293,36 @@ class YamlPipelineDefinitionParserTest {
                                                 .build())));
     }
 
+    @Test
+    void testMultipleSourceDefinition() throws Exception {
+        URL resource = Resources.getResource("definitions/multiple_source_mtdbak.yaml");
+        YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
+        PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
+        assertThat(pipelineDef).isInstanceOf(PipelineDef.class);
+    }
+
+    SourceDef sourceDef =
+            new SourceDef(
+                    "mysql",
+                    "source-database",
+                    Configuration.fromMap(
+                            ImmutableMap.<String, String>builder()
+                                    .put("host", "localhost")
+                                    .put("port", "3306")
+                                    .put("username", "admin")
+                                    .put("password", "pass")
+                                    .put(
+                                            "tables",
+                                            "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
+                                    .put("chunk-column", "app_order_.*:id,web_order:product_id")
+                                    .put("capture-new-tables", "true")
+                                    .build()));
+    List<SourceDef> sourceDefs = new ArrayList<>(Arrays.asList(new SourceDef[] {sourceDef}));
+
     private final PipelineDef fullDef =
             new PipelineDef(
-                    new SourceDef(
-                            "mysql",
-                            "source-database",
-                            Configuration.fromMap(
-                                    ImmutableMap.<String, String>builder()
-                                            .put("host", "localhost")
-                                            .put("port", "3306")
-                                            .put("username", "admin")
-                                            .put("password", "pass")
-                                            .put(
-                                                    "tables",
-                                                    "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
-                                            .put(
-                                                    "chunk-column",
-                                                    "app_order_.*:id,web_order:product_id")
-                                            .put("capture-new-tables", "true")
-                                            .build())),
+                    null,
+                    sourceDef,
                     new SinkDef(
                             "kafka",
                             "sink-queue",
@@ -426,25 +440,27 @@ class YamlPipelineDefinitionParserTest {
         assertThat(pipelineDef).isEqualTo(fullDef);
     }
 
+    SourceDef fullsourceDef =
+            new SourceDef(
+                    "mysql",
+                    "source-database",
+                    Configuration.fromMap(
+                            ImmutableMap.<String, String>builder()
+                                    .put("host", "localhost")
+                                    .put("port", "3306")
+                                    .put("username", "admin")
+                                    .put("password", "pass")
+                                    .put(
+                                            "tables",
+                                            "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
+                                    .put("chunk-column", "app_order_.*:id,web_order:product_id")
+                                    .put("capture-new-tables", "true")
+                                    .build()));
+
     private final PipelineDef fullDefWithGlobalConf =
             new PipelineDef(
-                    new SourceDef(
-                            "mysql",
-                            "source-database",
-                            Configuration.fromMap(
-                                    ImmutableMap.<String, String>builder()
-                                            .put("host", "localhost")
-                                            .put("port", "3306")
-                                            .put("username", "admin")
-                                            .put("password", "pass")
-                                            .put(
-                                                    "tables",
-                                                    "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
-                                            .put(
-                                                    "chunk-column",
-                                                    "app_order_.*:id,web_order:product_id")
-                                            .put("capture-new-tables", "true")
-                                            .build())),
+                    null,
+                    fullsourceDef,
                     new SinkDef(
                             "kafka",
                             "sink-queue",
@@ -504,21 +520,25 @@ class YamlPipelineDefinitionParserTest {
                                     .put("schema-operator.rpc-timeout", "1 h")
                                     .build()));
 
+    SourceDef defSourceDef =
+            new SourceDef(
+                    "mysql",
+                    null,
+                    Configuration.fromMap(
+                            ImmutableMap.<String, String>builder()
+                                    .put("host", "localhost")
+                                    .put("port", "3306")
+                                    .put("username", "admin")
+                                    .put("password", "pass")
+                                    .put(
+                                            "tables",
+                                            "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
+                                    .build()));
+
     private final PipelineDef defWithOptional =
             new PipelineDef(
-                    new SourceDef(
-                            "mysql",
-                            null,
-                            Configuration.fromMap(
-                                    ImmutableMap.<String, String>builder()
-                                            .put("host", "localhost")
-                                            .put("port", "3306")
-                                            .put("username", "admin")
-                                            .put("password", "pass")
-                                            .put(
-                                                    "tables",
-                                                    "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
-                                            .build())),
+                    null,
+                    defSourceDef,
                     new SinkDef(
                             "kafka",
                             null,
@@ -545,9 +565,12 @@ class YamlPipelineDefinitionParserTest {
                                     .put("parallelism", "4")
                                     .build()));
 
+    SourceDef mysqlSourceDef = new SourceDef("mysql", null, new Configuration());
+
     private final PipelineDef minimizedDef =
             new PipelineDef(
-                    new SourceDef("mysql", null, new Configuration()),
+                    null,
+                    mysqlSourceDef,
                     new SinkDef(
                             "kafka",
                             null,
@@ -565,25 +588,27 @@ class YamlPipelineDefinitionParserTest {
                             Collections.singletonMap(
                                     "local-time-zone", ZoneId.systemDefault().toString())));
 
+    SourceDef routeRepSymDef =
+            new SourceDef(
+                    "mysql",
+                    "source-database",
+                    Configuration.fromMap(
+                            ImmutableMap.<String, String>builder()
+                                    .put("host", "localhost")
+                                    .put("port", "3306")
+                                    .put("username", "admin")
+                                    .put("password", "pass")
+                                    .put(
+                                            "tables",
+                                            "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
+                                    .put("chunk-column", "app_order_.*:id,web_order:product_id")
+                                    .put("capture-new-tables", "true")
+                                    .build()));
+
     private final PipelineDef fullDefWithRouteRepSym =
             new PipelineDef(
-                    new SourceDef(
-                            "mysql",
-                            "source-database",
-                            Configuration.fromMap(
-                                    ImmutableMap.<String, String>builder()
-                                            .put("host", "localhost")
-                                            .put("port", "3306")
-                                            .put("username", "admin")
-                                            .put("password", "pass")
-                                            .put(
-                                                    "tables",
-                                                    "adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
-                                            .put(
-                                                    "chunk-column",
-                                                    "app_order_.*:id,web_order:product_id")
-                                            .put("capture-new-tables", "true")
-                                            .build())),
+                    null,
+                    routeRepSymDef,
                     new SinkDef(
                             "kafka",
                             "sink-queue",
@@ -633,6 +658,7 @@ class YamlPipelineDefinitionParserTest {
 
     private final PipelineDef pipelineDefWithUdf =
             new PipelineDef(
+                    null,
                     new SourceDef("values", null, new Configuration()),
                     new SinkDef(
                             "values",
diff --git a/flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml b/flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml
new file mode 100644
index 000000000..41546121c
--- /dev/null
+++ b/flink-cdc-cli/src/test/resources/definitions/multiple_source_mtd.yaml
@@ -0,0 +1,53 @@
+################################################################################
+# Copyright 2023 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+sources:
+  - type: mysql
+    hostname: 127.0.0.1
+    port: 50001
+    username: datawings
+    password: 123
+    tables: test.table01
+    server-id: 5400-5404
+    server-time-zone: Asia/Shanghai
+
+  - type: mysql
+    hostname: 127.0.0.2
+    port: 50002
+    username: datawings
+    password: 123
+    tables: test.table02
+    server-id: 5404-5408
+    server-time-zone: Asia/Shanghai
+
+
+route:
+  - source-table: test.table01
+    sink-table: test.table01
+    description: sync table to destination table1
+  - source-table: test.table02
+    sink-table: test.table02
+    description: sync table to destination table2
+
+sink:
+  type: doris
+  fenodes: 127.0.0.1:9033
+  username: root
+  password: 123
+  table.create.properties.light_schema_change: false
+  table.create.properties.replication_num: 1
+
+pipeline:
+  name: Sync MySQL Database to doris
+  parallelism: 1 
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
index c81d45fd9..ffad8e8f1 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
@@ -23,6 +23,8 @@ import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.composer.PipelineComposer;
 import org.apache.flink.cdc.composer.PipelineExecution;
 
+import javax.annotation.Nullable;
+
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
@@ -51,7 +53,8 @@ import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCA
  * before being submitted to the computing engine.
  */
 public class PipelineDef {
-    private final SourceDef source;
+    @Nullable private final List<SourceDef> sources;
+    @Nullable private final SourceDef source;
     private final SinkDef sink;
     private final List<RouteDef> routes;
     private final List<TransformDef> transforms;
@@ -60,6 +63,7 @@ public class PipelineDef {
     private final Configuration config;
 
     public PipelineDef(
+            List<SourceDef> sources,
             SourceDef source,
             SinkDef sink,
             List<RouteDef> routes,
@@ -67,25 +71,32 @@ public class PipelineDef {
             List<UdfDef> udfs,
             List<ModelDef> models,
             Configuration config) {
-        this.source = source;
+        this.sources = sources;
         this.sink = sink;
         this.routes = routes;
         this.transforms = transforms;
         this.udfs = udfs;
         this.models = models;
         this.config = evaluatePipelineTimeZone(config);
+        this.source = source;
     }
 
     public PipelineDef(
+            List<SourceDef> sources,
             SourceDef source,
             SinkDef sink,
             List<RouteDef> routes,
             List<TransformDef> transforms,
             List<UdfDef> udfs,
             Configuration config) {
-        this(source, sink, routes, transforms, udfs, new ArrayList<>(), config);
+        this(sources, source, sink, routes, transforms, udfs, new ArrayList<>(), config);
+    }
+
+    public List<SourceDef> getSources() {
+        return sources;
     }
 
+    @Nullable
     public SourceDef getSource() {
         return source;
     }
@@ -117,9 +128,11 @@ public class PipelineDef {
     @Override
     public String toString() {
         return "PipelineDef{"
-                + "source="
+                + "sources="
+                + sources
+                + ",source="
                 + source
-                + ", sink="
+                + ",sink="
                 + sink
                 + ", routes="
                 + routes
@@ -143,7 +156,8 @@ public class PipelineDef {
             return false;
         }
         PipelineDef that = (PipelineDef) o;
-        return Objects.equals(source, that.source)
+        return Objects.equals(sources, that.sources)
+                && Objects.equals(source, that.source)
                 && Objects.equals(sink, that.sink)
                 && Objects.equals(routes, that.routes)
                 && Objects.equals(transforms, that.transforms)
@@ -154,7 +168,7 @@ public class PipelineDef {
 
     @Override
     public int hashCode() {
-        return Objects.hash(source, sink, routes, transforms, udfs, models, config);
+        return Objects.hash(sources, source, sink, routes, transforms, udfs, models, config);
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index 533868c56..e14ca51f7 100644
--- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.source.DataSource;
 import org.apache.flink.cdc.composer.PipelineComposer;
 import org.apache.flink.cdc.composer.PipelineExecution;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
 import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
 import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
 import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator;
@@ -126,16 +127,41 @@ public class FlinkPipelineComposer implements PipelineComposer {
         // And required constructors
         OperatorIDGenerator schemaOperatorIDGenerator =
                 new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
-        DataSource dataSource =
-                sourceTranslator.createDataSource(pipelineDef.getSource(), pipelineDefConfig, env);
-        DataSink dataSink =
-                sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
+        List<SourceDef> sourceDefs = pipelineDef.getSources();
 
-        boolean isParallelMetadataSource = dataSource.isParallelMetadataSource();
+        // O ---> Source
+        DataStream<Event> stream = null;
+        DataSource dataSource = null;
+        boolean isParallelMetadataSource;
 
         // O ---> Source
-        DataStream<Event> stream =
-                sourceTranslator.translate(pipelineDef.getSource(), dataSource, env, parallelism);
+        if (sourceDefs != null) {
+            for (SourceDef source : sourceDefs) {
+                dataSource = sourceTranslator.createDataSource(source, pipelineDefConfig, env);
+                DataStream<Event> streamBranch =
+                        sourceTranslator.translate(source, dataSource, env, parallelism);
+                if (stream == null) {
+                    stream = streamBranch;
+                } else {
+                    stream = stream.union(streamBranch);
+                }
+            }
+            if (sourceDefs.size() > 1) {
+                isParallelMetadataSource = true;
+            } else {
+                isParallelMetadataSource = dataSource.isParallelMetadataSource();
+            }
+        } else {
+            dataSource =
+                    sourceTranslator.createDataSource(
+                            pipelineDef.getSource(), pipelineDefConfig, env);
+            stream =
+                    sourceTranslator.translate(
+                            pipelineDef.getSource(), dataSource, env, parallelism);
+            isParallelMetadataSource = dataSource.isParallelMetadataSource();
+        }
+        DataSink dataSink =
+                sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
 
         // Source ---> PreTransform
         stream =
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkParallelizedPipelineITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkParallelizedPipelineITCase.java
index 1920227da..8d1142772 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkParallelizedPipelineITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkParallelizedPipelineITCase.java
@@ -980,6 +980,7 @@ class FlinkParallelizedPipelineITCase {
         pipelineConfig.set(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, exception);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         traits == SourceTraits.MERGING ? ROUTING_RULES : Collections.emptyList(),
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 380baaa5d..5ade4d5af 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -151,6 +151,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -184,6 +185,90 @@ class FlinkPipelineComposerITCase {
                         "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}");
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testSingleSplitMultipleSources(ValuesDataSink.SinkApi sinkApi) throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig1 = new Configuration();
+        sourceConfig1.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE);
+        Configuration sourceConfig2 = new Configuration();
+        sourceConfig2.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
+        SourceDef sourceDef1 =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source1", sourceConfig1);
+        SourceDef sourceDef2 =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source2", sourceConfig2);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
+        List<SourceDef> sourceDefs = new ArrayList<>();
+        sourceDefs.add(sourceDef1);
+        sourceDefs.add(sourceDef2);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDefs,
+                        null,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check result in ValuesDatabase
+        List<String> table1Results = ValuesDatabase.getResults(TABLE_1);
+        assertThat(table1Results)
+                .containsExactly(
+                        "default_namespace.default_schema.table1:col1=2;newCol3=x",
+                        "default_namespace.default_schema.table1:col1=3;newCol3=");
+        List<String> table2Results = ValuesDatabase.getResults(TABLE_2);
+        assertThat(table2Results)
+                .contains(
+                        "default_namespace.default_schema.table2:col1=1;col2=1",
+                        "default_namespace.default_schema.table2:col1=2;col2=2",
+                        "default_namespace.default_schema.table2:col1=3;col2=3");
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(outputEvents)
+                .containsExactlyInAnyOrder(
+                        "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+                        "CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
+                        "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
+                        "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
+                        "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, null], op=INSERT, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, null], op=INSERT, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, null], op=INSERT, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 1], after=[], op=DELETE, meta=()}",
+                        "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2, 2], after=[2, x, x], op=UPDATE, meta=()}");
+    }
+
     @ParameterizedTest
     @EnumSource
     void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Exception {
@@ -210,6 +295,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -277,6 +363,7 @@ class FlinkPipelineComposerITCase {
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, MAX_PARALLELISM);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -335,6 +422,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -398,6 +486,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -470,6 +559,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -527,6 +617,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         routeDef,
@@ -602,6 +693,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         routeDef,
@@ -801,6 +893,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         routeDef,
@@ -1010,6 +1103,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         routeDef,
@@ -1075,6 +1169,7 @@ class FlinkPipelineComposerITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.singletonList(
@@ -1141,6 +1236,7 @@ class FlinkPipelineComposerITCase {
         pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/New_York");
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Arrays.asList(
@@ -1251,8 +1347,11 @@ class FlinkPipelineComposerITCase {
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
         pipelineConfig.set(
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
+        List<SourceDef> sourceDefs = new ArrayList<>();
+        sourceDefs.add(sourceDef);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.singletonList(
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
index 358094aa6..f256e9f55 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
@@ -136,9 +136,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -194,9 +194,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -253,9 +253,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -322,6 +322,7 @@ class FlinkPipelineComposerLenientITCase {
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, MAX_PARALLELISM);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -376,9 +377,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -437,9 +438,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -507,9 +508,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -562,9 +563,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         routeDef,
@@ -635,9 +636,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         routeDef,
@@ -832,9 +833,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         routeDef,
@@ -1040,9 +1041,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         routeDef,
@@ -1104,9 +1105,9 @@ class FlinkPipelineComposerLenientITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.singletonList(
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 6a775e74a..6bd428d9c 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -784,6 +784,7 @@ class FlinkPipelineTransformITCase {
         pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles");
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -859,6 +860,7 @@ class FlinkPipelineTransformITCase {
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -966,6 +968,7 @@ class FlinkPipelineTransformITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -1052,6 +1055,7 @@ class FlinkPipelineTransformITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -1147,6 +1151,7 @@ class FlinkPipelineTransformITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -1230,6 +1235,7 @@ class FlinkPipelineTransformITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -1325,6 +1331,7 @@ class FlinkPipelineTransformITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -1421,6 +1428,7 @@ class FlinkPipelineTransformITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -1516,6 +1524,7 @@ class FlinkPipelineTransformITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -1633,8 +1642,10 @@ class FlinkPipelineTransformITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -1700,6 +1711,7 @@ class FlinkPipelineTransformITCase {
                     PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
             PipelineDef pipelineDef =
                     new PipelineDef(
+                            null,
                             sourceDef,
                             sinkDef,
                             Collections.emptyList(),
@@ -1874,6 +1886,7 @@ class FlinkPipelineTransformITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
index b97b4b6de..4a9708f99 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
@@ -150,6 +150,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -221,6 +222,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -290,6 +292,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -361,6 +364,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -434,6 +438,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -498,6 +503,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -594,6 +600,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -664,6 +671,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -732,6 +740,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -796,6 +805,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
@@ -875,6 +885,7 @@ public class FlinkPipelineUdfITCase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java
index 4bd9806a5..24c4b4bb0 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java
@@ -142,6 +142,7 @@ public class MySqlParallelizedPipelineITCase extends MySqlSourceTestBase {
                 PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
+                        null,
                         sourceDef,
                         sinkDef,
                         Collections.emptyList(),