[FLINK-35730][cdc-cli] PipelineDefinitionParser supports parsing pipeline def in text format

This closes #3444.
pull/3523/head
Wink 6 months ago committed by GitHub
parent af02ce1bc9
commit 2dabfc0815
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -30,4 +30,10 @@ public interface PipelineDefinitionParser {
* the {@link PipelineDef}.
*/
PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig) throws Exception;
/**
* Parse the specified pipeline definition string, merge global configurations, then generate
* the {@link PipelineDef}.
*/
PipelineDef parse(String pipelineDefText, Configuration globalPipelineConfig) throws Exception;
}

@ -82,13 +82,22 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
@Override
public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig)
throws Exception {
JsonNode root = mapper.readTree(pipelineDefPath.toFile());
return parse(mapper.readTree(pipelineDefPath.toFile()), globalPipelineConfig);
}
@Override
public PipelineDef parse(String pipelineDefText, Configuration globalPipelineConfig)
throws Exception {
return parse(mapper.readTree(pipelineDefText), globalPipelineConfig);
}
private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
throws Exception {
// Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
root.get(SOURCE_KEY),
pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
@ -96,13 +105,13 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
SinkDef sinkDef =
toSinkDef(
checkNotNull(
root.get(SINK_KEY),
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline definition",
SINK_KEY));
// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
Optional.ofNullable(root.get(TRANSFORM_KEY))
Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY))
.ifPresent(
node ->
node.forEach(
@ -110,11 +119,11 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// Routes are optional
List<RouteDef> routeDefs = new ArrayList<>();
Optional.ofNullable(root.get(ROUTE_KEY))
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));
// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY));
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
// Merge user config into global config
Configuration pipelineConfig = new Configuration();

@ -238,6 +238,57 @@ class YamlPipelineDefinitionParserTest {
.put("schema-operator.rpc-timeout", "1 h")
.build()));
@Test
void testParsingFullDefinitionFromString() throws Exception {
String pipelineDefText =
"source:\n"
+ " type: mysql\n"
+ " name: source-database\n"
+ " host: localhost\n"
+ " port: 3306\n"
+ " username: admin\n"
+ " password: pass\n"
+ " tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*\n"
+ " chunk-column: app_order_.*:id,web_order:product_id\n"
+ " capture-new-tables: true\n"
+ "\n"
+ "sink:\n"
+ " type: kafka\n"
+ " name: sink-queue\n"
+ " bootstrap-servers: localhost:9092\n"
+ " auto-create-table: true\n"
+ "\n"
+ "route:\n"
+ " - source-table: mydb.default.app_order_.*\n"
+ " sink-table: odsdb.default.app_order\n"
+ " description: sync all sharding tables to one\n"
+ " - source-table: mydb.default.web_order\n"
+ " sink-table: odsdb.default.ods_web_order\n"
+ " description: sync table to with given prefix ods_\n"
+ "\n"
+ "transform:\n"
+ " - source-table: mydb.app_order_.*\n"
+ " projection: id, order_id, TO_UPPER(product_name)\n"
+ " filter: id > 10 AND order_id > 100\n"
+ " primary-keys: id\n"
+ " partition-keys: product_name\n"
+ " table-options: comment=app order\n"
+ " description: project fields from source table\n"
+ " - source-table: mydb.web_order_.*\n"
+ " projection: CONCAT(id, order_id) as uniq_id, *\n"
+ " filter: uniq_id > 10\n"
+ " description: add new uniq_id for each row\n"
+ "\n"
+ "pipeline:\n"
+ " name: source-database-sync-pipe\n"
+ " parallelism: 4\n"
+ " schema.change.behavior: evolve\n"
+ " schema-operator.rpc-timeout: 1 h";
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(pipelineDefText, new Configuration());
assertThat(pipelineDef).isEqualTo(fullDef);
}
private final PipelineDef fullDefWithGlobalConf =
new PipelineDef(
new SourceDef(

Loading…
Cancel
Save