[FLINK-36829] Add better validation of pipeline config

pull/3851/head
zhangchaoming.zcm 3 weeks ago
parent 92081dfe58
commit 0e9879037b

@ -48,6 +48,7 @@ import java.util.stream.Stream;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
/** Parser for converting YAML formatted pipeline definition to {@link PipelineDef}. */
@ -136,21 +137,18 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
// Source is required
SourceDef sourceDef =
toSourceDef(
checkNotNull(
pipelineDefJsonNode.get(SOURCE_KEY),
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY));
checkArgument(
pipelineDefJsonNode.get(SOURCE_KEY) != null,
"Missing required field \"%s\" in pipeline definition",
SOURCE_KEY);
SourceDef sourceDef = toSourceDef(pipelineDefJsonNode.get(SOURCE_KEY));
// Sink is required
SinkDef sinkDef =
toSinkDef(
checkNotNull(
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline definition",
SINK_KEY),
schemaChangeBehavior);
checkArgument(
pipelineDefJsonNode.get(SINK_KEY) != null,
"Missing required field \"%s\" in pipeline definition",
SINK_KEY);
SinkDef sinkDef = toSinkDef(pipelineDefJsonNode.get(SINK_KEY), schemaChangeBehavior);
// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();

@ -34,6 +34,7 @@ import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
import java.io.FileNotFoundException;
import java.net.URL;
import java.nio.file.Paths;
import java.time.Duration;
@ -53,6 +54,7 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_T
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.junit.Assert.assertThrows;
/** Unit test for {@link org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser}. */
class YamlPipelineDefinitionParserTest {
@ -81,6 +83,38 @@ class YamlPipelineDefinitionParserTest {
assertThat(pipelineDef).isEqualTo(minimizedDef);
}
@Test
void testEmptyDefinition() {
IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() -> {
URL resource =
Resources.getResource(
"definitions/pipeline-definition-empty.yaml");
YamlPipelineDefinitionParser parser =
new YamlPipelineDefinitionParser();
parser.parse(Paths.get(resource.toURI()), new Configuration());
});
assertThat(exception.getMessage())
.isEqualTo("Missing required field \"source\" in pipeline definition");
}
@Test
void testDefinitionFileNotExist() {
FileNotFoundException exception =
assertThrows(
FileNotFoundException.class,
() -> {
YamlPipelineDefinitionParser parser =
new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
parser.parse(Paths.get("not-exist"), new Configuration());
assertThat(pipelineDef).isEqualTo(minimizedDef);
});
assertThat(exception.getMessage()).isEqualTo("not-exist (No such file or directory)");
}
@Test
void testOverridingGlobalConfig() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");

@ -0,0 +1,16 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
Loading…
Cancel
Save