diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index b594aa35b..8e3d84eed 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -48,6 +48,7 @@ import java.util.stream.Stream; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR; import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions; +import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument; import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; /** Parser for converting YAML formatted pipeline definition to {@link PipelineDef}. */ @@ -136,21 +137,18 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR); // Source is required - SourceDef sourceDef = - toSourceDef( - checkNotNull( - pipelineDefJsonNode.get(SOURCE_KEY), - "Missing required field \"%s\" in pipeline definition", - SOURCE_KEY)); + checkArgument( + pipelineDefJsonNode.get(SOURCE_KEY) != null, + "Missing required field \"%s\" in pipeline definition", + SOURCE_KEY); + SourceDef sourceDef = toSourceDef(pipelineDefJsonNode.get(SOURCE_KEY)); // Sink is required - SinkDef sinkDef = - toSinkDef( - checkNotNull( - pipelineDefJsonNode.get(SINK_KEY), - "Missing required field \"%s\" in pipeline definition", - SINK_KEY), - schemaChangeBehavior); + checkArgument( + pipelineDefJsonNode.get(SINK_KEY) != null, + "Missing required field \"%s\" in pipeline definition", + SINK_KEY); + SinkDef sinkDef = toSinkDef(pipelineDefJsonNode.get(SINK_KEY), schemaChangeBehavior); // Transforms are optional List<TransformDef> transformDefs = new ArrayList<>(); diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 27cbbb1de..6415286cd 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -34,6 +34,7 @@ import org.apache.flink.shaded.guava31.com.google.common.io.Resources; import org.junit.jupiter.api.Test; +import java.io.FileNotFoundException; import java.net.URL; import java.nio.file.Paths; import java.time.Duration; @@ -53,6 +54,7 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_T import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.junit.Assert.assertThrows; /** Unit test for {@link org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser}. */ class YamlPipelineDefinitionParserTest { @@ -81,6 +83,38 @@ class YamlPipelineDefinitionParserTest { assertThat(pipelineDef).isEqualTo(minimizedDef); } + @Test + void testEmptyDefinition() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> { + URL resource = + Resources.getResource( + "definitions/pipeline-definition-empty.yaml"); + YamlPipelineDefinitionParser parser = + new YamlPipelineDefinitionParser(); + parser.parse(Paths.get(resource.toURI()), new Configuration()); + }); + assertThat(exception.getMessage()) + .isEqualTo("Missing required field \"source\" in pipeline definition"); + } + + @Test + void testDefinitionFileNotExist() { + FileNotFoundException exception = + assertThrows( + FileNotFoundException.class, + () -> { + YamlPipelineDefinitionParser parser = + new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = + parser.parse(Paths.get("not-exist"), new Configuration()); + assertThat(pipelineDef).isEqualTo(minimizedDef); + }); + assertThat(exception.getMessage()).isEqualTo("not-exist (No such file or directory)"); + } + @Test void testOverridingGlobalConfig() throws Exception { URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml"); diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-empty.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-empty.yaml new file mode 100644 index 000000000..bbaac6e3e --- /dev/null +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-empty.yaml @@ -0,0 +1,16 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################