[FLINK-36794] [cdc-composer/cli] pipeline cdc connector support multiple data sources

pull/3844/head
linjc13 3 weeks ago
parent 92081dfe58
commit 994d17a950

@ -50,9 +50,9 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
</table>
</div>
## 示例
## 单数据源示例
从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
单数据源,单个 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
```yaml
source:
@ -77,6 +77,44 @@ pipeline:
parallelism: 4
```
## 多数据源示例
多数据源从多个mysql数据源读取数据同步到 Doris 的 Pipeline 可以定义如下:
```yaml
sources:
- type: mysql
name: MySQL multiple Source1
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai
- type: mysql
name: MySQL multiple Source2
hostname: 127.0.0.2
port: 3307
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5405-5409
server-time-zone: Asia/Shanghai
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
```
## 连接器配置项
<div class="highlight">

@ -51,9 +51,9 @@ You may need to configure the following dependencies manually, and pass it with
</table>
</div>
## Example
## single data source Example
An example of the pipeline for reading data from MySQL and sink to Doris can be defined as follows:
An example of the pipeline for reading data from single MySQL and sink to Doris can be defined as follows:
```yaml
source:
@ -78,6 +78,44 @@ pipeline:
parallelism: 4
```
## multiple data source Example
An example of the pipeline for reading data from multiple MySQL datasource and sink to Doris can be defined as follows:
```yaml
sources:
- type: mysql
name: MySQL multiple Source1
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai
- type: mysql
name: MySQL multiple Source2
hostname: 127.0.0.2
port: 3307
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5405-5409
server-time-zone: Asia/Shanghai
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
```
## Connector Options
<div class="highlight">

@ -40,6 +40,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -55,6 +56,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// Parent node keys
private static final String SOURCE_KEY = "source";
private static final String MULTIPLE_SOURCE_KEY = "sources";
private static final String SINK_KEY = "sink";
private static final String ROUTE_KEY = "route";
private static final String TRANSFORM_KEY = "transform";
@ -63,6 +65,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// Source / sink keys
private static final String TYPE_KEY = "type";
private static final String SOURCES = "sources";
private static final String NAME_KEY = "name";
private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";
@ -135,13 +138,20 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
// Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
JsonNode multipleSourceNode = pipelineDefJsonNode.get(MULTIPLE_SOURCE_KEY);
List<SourceDef> sourceDefs = new ArrayList<>();
SourceDef sourceDef = null;
if (multipleSourceNode != null) {
Iterator<JsonNode> it = multipleSourceNode.elements();
while (it.hasNext()) {
JsonNode sourceNode = it.next();
getSourceDefs(sourceNode, sourceDefs);
}
} else {
JsonNode sourceNode = pipelineDefJsonNode.get(SOURCE_KEY);
// Source is required
sourceDef = getSourceDefs(sourceNode, sourceDefs);
}
// Sink is required
SinkDef sinkDef =
@ -171,7 +181,25 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
pipelineConfig.addAll(userPipelineConfig);
return new PipelineDef(
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
sourceDefs,
sourceDef,
sinkDef,
routeDefs,
transformDefs,
udfDefs,
modelDefs,
pipelineConfig);
}
private SourceDef getSourceDefs(JsonNode root, List<SourceDef> sourceDefs) {
SourceDef sourceDef =
toSourceDef(
checkNotNull(
root,
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
sourceDefs.add(sourceDef);
return sourceDef;
}
private SourceDef toSourceDef(JsonNode sourceNode) {

@ -38,9 +38,11 @@ import java.net.URL;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
@ -278,6 +280,7 @@ class YamlPipelineDefinitionParserTest {
assertThat(pipelineDef)
.isEqualTo(
new PipelineDef(
null,
new SourceDef("foo", null, new Configuration()),
new SinkDef("bar", null, new Configuration(), expected),
Collections.emptyList(),
@ -290,25 +293,36 @@ class YamlPipelineDefinitionParserTest {
.build())));
}
@Test
void testMultipleSourceDefinition() throws Exception {
URL resource = Resources.getResource("definitions/multiple_source_mtdbak.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
assertThat(pipelineDef).isInstanceOf(PipelineDef.class);
}
SourceDef sourceDef =
new SourceDef(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "pass")
.put(
"tables",
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
.put("chunk-column", "app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.build()));
List<SourceDef> sourceDefs = new ArrayList<>(Arrays.asList(new SourceDef[] {sourceDef}));
private final PipelineDef fullDef =
new PipelineDef(
new SourceDef(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "pass")
.put(
"tables",
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.build())),
null,
sourceDef,
new SinkDef(
"kafka",
"sink-queue",
@ -426,25 +440,27 @@ class YamlPipelineDefinitionParserTest {
assertThat(pipelineDef).isEqualTo(fullDef);
}
SourceDef fullsourceDef =
new SourceDef(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "pass")
.put(
"tables",
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
.put("chunk-column", "app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.build()));
private final PipelineDef fullDefWithGlobalConf =
new PipelineDef(
new SourceDef(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "pass")
.put(
"tables",
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.build())),
null,
fullsourceDef,
new SinkDef(
"kafka",
"sink-queue",
@ -504,21 +520,25 @@ class YamlPipelineDefinitionParserTest {
.put("schema-operator.rpc-timeout", "1 h")
.build()));
SourceDef defSourceDef =
new SourceDef(
"mysql",
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "pass")
.put(
"tables",
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
.build()));
private final PipelineDef defWithOptional =
new PipelineDef(
new SourceDef(
"mysql",
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "pass")
.put(
"tables",
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
.build())),
null,
defSourceDef,
new SinkDef(
"kafka",
null,
@ -545,9 +565,12 @@ class YamlPipelineDefinitionParserTest {
.put("parallelism", "4")
.build()));
SourceDef mysqlSourceDef = new SourceDef("mysql", null, new Configuration());
private final PipelineDef minimizedDef =
new PipelineDef(
new SourceDef("mysql", null, new Configuration()),
null,
mysqlSourceDef,
new SinkDef(
"kafka",
null,
@ -565,25 +588,27 @@ class YamlPipelineDefinitionParserTest {
Collections.singletonMap(
"local-time-zone", ZoneId.systemDefault().toString())));
SourceDef routeRepSymDef =
new SourceDef(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "pass")
.put(
"tables",
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
.put("chunk-column", "app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.build()));
private final PipelineDef fullDefWithRouteRepSym =
new PipelineDef(
new SourceDef(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "localhost")
.put("port", "3306")
.put("username", "admin")
.put("password", "pass")
.put(
"tables",
"adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*")
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.build())),
null,
routeRepSymDef,
new SinkDef(
"kafka",
"sink-queue",
@ -633,6 +658,7 @@ class YamlPipelineDefinitionParserTest {
private final PipelineDef pipelineDefWithUdf =
new PipelineDef(
null,
new SourceDef("values", null, new Configuration()),
new SinkDef(
"values",

@ -0,0 +1,53 @@
################################################################################
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
sources:
- type: mysql
hostname: 127.0.0.1
port: 50001
username: datawings
password: 123
tables: test.table01
server-id: 5400-5404
server-time-zone: Asia/Shanghai
- type: mysql
hostname: 127.0.0.2
port: 50002
username: datawings
password: 123
tables: test.table02
server-id: 5404-5408
server-time-zone: Asia/Shanghai
route:
- source-table: test.table01
sink-table: test.table01
description: sync table to destination table1
- source-table: test.table02
sink-table: test.table02
description: sync table to destination table2
sink:
type: doris
fenodes: 127.0.0.1:9033
username: root
password: 123
table.create.properties.light_schema_change: false
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to doris
parallelism: 1

@ -23,6 +23,8 @@ import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import javax.annotation.Nullable;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
@ -51,7 +53,8 @@ import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCA
* before being submitted to the computing engine.
*/
public class PipelineDef {
private final SourceDef source;
@Nullable private final List<SourceDef> sources;
@Nullable private final SourceDef source;
private final SinkDef sink;
private final List<RouteDef> routes;
private final List<TransformDef> transforms;
@ -60,6 +63,7 @@ public class PipelineDef {
private final Configuration config;
public PipelineDef(
List<SourceDef> sources,
SourceDef source,
SinkDef sink,
List<RouteDef> routes,
@ -67,25 +71,32 @@ public class PipelineDef {
List<UdfDef> udfs,
List<ModelDef> models,
Configuration config) {
this.source = source;
this.sources = sources;
this.sink = sink;
this.routes = routes;
this.transforms = transforms;
this.udfs = udfs;
this.models = models;
this.config = evaluatePipelineTimeZone(config);
this.source = source;
}
public PipelineDef(
List<SourceDef> sources,
SourceDef source,
SinkDef sink,
List<RouteDef> routes,
List<TransformDef> transforms,
List<UdfDef> udfs,
Configuration config) {
this(source, sink, routes, transforms, udfs, new ArrayList<>(), config);
this(sources, source, sink, routes, transforms, udfs, new ArrayList<>(), config);
}
public List<SourceDef> getSources() {
return sources;
}
@Nullable
public SourceDef getSource() {
return source;
}
@ -117,9 +128,11 @@ public class PipelineDef {
@Override
public String toString() {
return "PipelineDef{"
+ "source="
+ "sources="
+ sources
+ ",source="
+ source
+ ", sink="
+ ",sink="
+ sink
+ ", routes="
+ routes
@ -143,7 +156,8 @@ public class PipelineDef {
return false;
}
PipelineDef that = (PipelineDef) o;
return Objects.equals(source, that.source)
return Objects.equals(sources, that.sources)
&& Objects.equals(source, that.source)
&& Objects.equals(sink, that.sink)
&& Objects.equals(routes, that.routes)
&& Objects.equals(transforms, that.transforms)
@ -154,7 +168,7 @@ public class PipelineDef {
@Override
public int hashCode() {
return Objects.hash(source, sink, routes, transforms, udfs, models, config);
return Objects.hash(sources, source, sink, routes, transforms, udfs, models, config);
}
// ------------------------------------------------------------------------

@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator;
@ -126,16 +127,41 @@ public class FlinkPipelineComposer implements PipelineComposer {
// And required constructors
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
DataSource dataSource =
sourceTranslator.createDataSource(pipelineDef.getSource(), pipelineDefConfig, env);
DataSink dataSink =
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
List<SourceDef> sourceDefs = pipelineDef.getSources();
boolean isParallelMetadataSource = dataSource.isParallelMetadataSource();
// O ---> Source
DataStream<Event> stream = null;
DataSource dataSource = null;
boolean isParallelMetadataSource;
// O ---> Source
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), dataSource, env, parallelism);
if (sourceDefs != null) {
for (SourceDef source : sourceDefs) {
dataSource = sourceTranslator.createDataSource(source, pipelineDefConfig, env);
DataStream<Event> streamBranch =
sourceTranslator.translate(source, dataSource, env, parallelism);
if (stream == null) {
stream = streamBranch;
} else {
stream = stream.union(streamBranch);
}
}
if (sourceDefs.size() > 1) {
isParallelMetadataSource = true;
} else {
isParallelMetadataSource = dataSource.isParallelMetadataSource();
}
} else {
dataSource =
sourceTranslator.createDataSource(
pipelineDef.getSource(), pipelineDefConfig, env);
stream =
sourceTranslator.translate(
pipelineDef.getSource(), dataSource, env, parallelism);
isParallelMetadataSource = dataSource.isParallelMetadataSource();
}
DataSink dataSink =
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
// Source ---> PreTransform
stream =

@ -980,6 +980,7 @@ class FlinkParallelizedPipelineITCase {
pipelineConfig.set(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, exception);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
traits == SourceTraits.MERGING ? ROUTING_RULES : Collections.emptyList(),

@ -151,6 +151,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -184,6 +185,90 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}");
}
@ParameterizedTest
@EnumSource
void testSingleSplitMultipleSources(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig1 = new Configuration();
sourceConfig1.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE);
Configuration sourceConfig2 = new Configuration();
sourceConfig2.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
SourceDef sourceDef1 =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source1", sourceConfig1);
SourceDef sourceDef2 =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source2", sourceConfig2);
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
List<SourceDef> sourceDefs = new ArrayList<>();
sourceDefs.add(sourceDef1);
sourceDefs.add(sourceDef2);
PipelineDef pipelineDef =
new PipelineDef(
sourceDefs,
null,
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
// Check result in ValuesDatabase
List<String> table1Results = ValuesDatabase.getResults(TABLE_1);
assertThat(table1Results)
.containsExactly(
"default_namespace.default_schema.table1:col1=2;newCol3=x",
"default_namespace.default_schema.table1:col1=3;newCol3=");
List<String> table2Results = ValuesDatabase.getResults(TABLE_2);
assertThat(table2Results)
.contains(
"default_namespace.default_schema.table2:col1=1;col2=1",
"default_namespace.default_schema.table2:col1=2;col2=2",
"default_namespace.default_schema.table2:col1=3;col2=3");
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactlyInAnyOrder(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2, 2], after=[2, x, x], op=UPDATE, meta=()}");
}
@ParameterizedTest
@EnumSource
void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Exception {
@ -210,6 +295,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -277,6 +363,7 @@ class FlinkPipelineComposerITCase {
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, MAX_PARALLELISM);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -335,6 +422,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -398,6 +486,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -470,6 +559,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -527,6 +617,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
routeDef,
@ -602,6 +693,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
routeDef,
@ -801,6 +893,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
routeDef,
@ -1010,6 +1103,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
routeDef,
@ -1075,6 +1169,7 @@ class FlinkPipelineComposerITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.singletonList(
@ -1141,6 +1236,7 @@ class FlinkPipelineComposerITCase {
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/New_York");
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Arrays.asList(
@ -1251,8 +1347,11 @@ class FlinkPipelineComposerITCase {
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
List<SourceDef> sourceDefs = new ArrayList<>();
sourceDefs.add(sourceDef);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.singletonList(

@ -136,9 +136,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -194,9 +194,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -253,9 +253,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -322,6 +322,7 @@ class FlinkPipelineComposerLenientITCase {
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, MAX_PARALLELISM);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -376,9 +377,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -437,9 +438,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -507,9 +508,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -562,9 +563,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
routeDef,
@ -635,9 +636,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
routeDef,
@ -832,9 +833,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
routeDef,
@ -1040,9 +1041,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
routeDef,
@ -1104,9 +1105,9 @@ class FlinkPipelineComposerLenientITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.singletonList(

@ -784,6 +784,7 @@ class FlinkPipelineTransformITCase {
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles");
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -859,6 +860,7 @@ class FlinkPipelineTransformITCase {
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -966,6 +968,7 @@ class FlinkPipelineTransformITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -1052,6 +1055,7 @@ class FlinkPipelineTransformITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -1147,6 +1151,7 @@ class FlinkPipelineTransformITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -1230,6 +1235,7 @@ class FlinkPipelineTransformITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -1325,6 +1331,7 @@ class FlinkPipelineTransformITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -1421,6 +1428,7 @@ class FlinkPipelineTransformITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -1516,6 +1524,7 @@ class FlinkPipelineTransformITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -1633,8 +1642,10 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -1700,6 +1711,7 @@ class FlinkPipelineTransformITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -1874,6 +1886,7 @@ class FlinkPipelineTransformITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),

@ -150,6 +150,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -221,6 +222,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -290,6 +292,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -361,6 +364,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -434,6 +438,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -498,6 +503,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -594,6 +600,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -664,6 +671,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -732,6 +740,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -796,6 +805,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),
@ -875,6 +885,7 @@ public class FlinkPipelineUdfITCase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),

@ -142,6 +142,7 @@ public class MySqlParallelizedPipelineITCase extends MySqlSourceTestBase {
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
null,
sourceDef,
sinkDef,
Collections.emptyList(),

Loading…
Cancel
Save