[FLINK-36174][cdc-cli] CDC yaml without pipeline should not throw exception. (#3588)

pull/3502/merge
Hongshun Wang 3 months ago committed by GitHub
parent 2a5828c0ac
commit aa4f15a636
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -108,8 +108,11 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
// it's not of plain data types and must be removed before calling toPipelineConfig.
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));
if (pipelineDefJsonNode.get(PIPELINE_KEY) != null) {
Optional.ofNullable(
((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));
}
// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));

@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
import java.net.URL;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
@ -426,7 +427,9 @@ class YamlPipelineDefinitionParserTest {
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(Collections.singletonMap("parallelism", "1")));
Configuration.fromMap(
Collections.singletonMap(
"local-time-zone", ZoneId.systemDefault().toString())));
private final PipelineDef fullDefWithRouteRepSym =
new PipelineDef(

@ -19,6 +19,3 @@ source:
sink:
type: kafka
pipeline:
parallelism: 1
Loading…
Cancel
Save