From aa4f15a6364227d39fa2c586b4fdac507f627fc7 Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Tue, 29 Oct 2024 11:12:38 +0800 Subject: [PATCH] [FLINK-36174][cdc-cli] CDC yaml without pipeline should not throw exception. (#3588) --- .../flink/cdc/cli/parser/YamlPipelineDefinitionParser.java | 7 +++++-- .../cdc/cli/parser/YamlPipelineDefinitionParserTest.java | 5 ++++- .../definitions/pipeline-definition-minimized.yaml | 3 --- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index 8179fdbff..9d2f4c998 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -108,8 +108,11 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { // UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since // it's not of plain data types and must be removed before calling toPipelineConfig. List udfDefs = new ArrayList<>(); - Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY)) - .ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf)))); + if (pipelineDefJsonNode.get(PIPELINE_KEY) != null) { + Optional.ofNullable( + ((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY)) + .ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf)))); + } // Pipeline configs are optional Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY)); diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 2ecf45870..1291b34ea 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test; import java.net.URL; import java.nio.file.Paths; import java.time.Duration; +import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; @@ -426,7 +427,9 @@ class YamlPipelineDefinitionParserTest { Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), - Configuration.fromMap(Collections.singletonMap("parallelism", "1"))); + Configuration.fromMap( + Collections.singletonMap( + "local-time-zone", ZoneId.systemDefault().toString()))); private final PipelineDef fullDefWithRouteRepSym = new PipelineDef( diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-minimized.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-minimized.yaml index 20808e472..1ebeede7a 100644 --- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-minimized.yaml +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-minimized.yaml @@ -19,6 +19,3 @@ source: sink: type: kafka - -pipeline: - parallelism: 1 \ No newline at end of file