From 73086c4f37dbfd8a4ab9e4b99a9bd030aefe3982 Mon Sep 17 00:00:00 2001 From: Qingsheng Ren Date: Wed, 6 Dec 2023 13:43:28 +0800 Subject: [PATCH] [cdc-cli][cdc-dist] Support loading global config from FLINK_CDC_HOME (#2822) This closes #2822. --- .../com/ververica/cdc/cli/CliFrontend.java | 23 +++++++++++-- .../parser/YamlPipelineDefinitionParser.java | 2 +- .../src/main/flink-cdc-bin/bin/flink-cdc.sh | 1 + .../main/flink-cdc-bin/conf/flink-cdc.yaml | 34 +++++++++++-------- 4 files changed, 42 insertions(+), 18 deletions(-) diff --git a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliFrontend.java b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliFrontend.java index 1b3b79f32..57556dcc9 100644 --- a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliFrontend.java +++ b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/CliFrontend.java @@ -42,6 +42,7 @@ import java.util.stream.Collectors; public class CliFrontend { private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); private static final String FLINK_HOME_ENV_VAR = "FLINK_HOME"; + private static final String FLINK_CDC_HOME_ENV_VAR = "FLINK_CDC_HOME"; public static void main(String[] args) throws Exception { Options cliOptions = CliFrontendOptions.initializeOptions(); @@ -127,11 +128,27 @@ public class CliFrontend { } private static Configuration getGlobalConfig(CommandLine commandLine) throws Exception { + // Try to get global config path from command line String globalConfig = commandLine.getOptionValue(CliFrontendOptions.GLOBAL_CONFIG); - if (globalConfig == null) { - return new Configuration(); + if (globalConfig != null) { + Path globalConfigPath = Paths.get(globalConfig); + LOG.info("Using global config in command line: {}", globalConfigPath); + return ConfigurationUtils.loadMapFormattedConfig(globalConfigPath); } - return ConfigurationUtils.loadMapFormattedConfig(Paths.get(globalConfig)); + + // Fallback to Flink CDC home + String flinkCdcHome = System.getenv(FLINK_CDC_HOME_ENV_VAR); + if (flinkCdcHome != null) { + Path globalConfigPath = + Paths.get(flinkCdcHome).resolve("conf").resolve("flink-cdc.yaml"); + LOG.info("Using global config in FLINK_CDC_HOME: {}", globalConfigPath); + return ConfigurationUtils.loadMapFormattedConfig(globalConfigPath); + } + + // Fallback to empty configuration + LOG.warn( + "Cannot find global configuration in command-line or FLINK_CDC_HOME. Will use empty global configuration."); + return new Configuration(); } private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) { diff --git a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java index 700400933..1ae0153e0 100644 --- a/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/com/ververica/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -148,7 +148,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { } private Configuration toPipelineConfig(JsonNode pipelineConfigNode) { - if (pipelineConfigNode == null) { + if (pipelineConfigNode == null || pipelineConfigNode.isNull()) { return new Configuration(); } Map pipelineConfigMap = diff --git a/flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh b/flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh index d654f4cf2..eb98e0cf1 100644 --- a/flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh +++ b/flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh @@ -44,6 +44,7 @@ FLINK_CONF_DIR=$FLINK_HOME/conf # Define Flink CDC directories SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) FLINK_CDC_HOME="$SCRIPT_DIR"/.. +export FLINK_CDC_HOME=$FLINK_CDC_HOME FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib FLINK_CDC_LOG="$FLINK_CDC_HOME"/log diff --git a/flink-cdc-dist/src/main/flink-cdc-bin/conf/flink-cdc.yaml b/flink-cdc-dist/src/main/flink-cdc-bin/conf/flink-cdc.yaml index a03d876a3..11df74b7f 100644 --- a/flink-cdc-dist/src/main/flink-cdc-bin/conf/flink-cdc.yaml +++ b/flink-cdc-dist/src/main/flink-cdc-bin/conf/flink-cdc.yaml @@ -1,14 +1,20 @@ - +################################################################################ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +################################################################################ + +# Parallelism of the pipeline +parallelism: 4 + +# Behavior for handling schema change events from source +schema.change.behavior: EVOLVE