[cdc-cli][cdc-dist] Support loading global config from FLINK_CDC_HOME ()

This closes .
pull/2823/head
Qingsheng Ren committed by GitHub
parent 9415260a25
commit 73086c4f37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -42,6 +42,7 @@ import java.util.stream.Collectors;
public class CliFrontend { public class CliFrontend {
private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
private static final String FLINK_HOME_ENV_VAR = "FLINK_HOME"; private static final String FLINK_HOME_ENV_VAR = "FLINK_HOME";
private static final String FLINK_CDC_HOME_ENV_VAR = "FLINK_CDC_HOME";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Options cliOptions = CliFrontendOptions.initializeOptions(); Options cliOptions = CliFrontendOptions.initializeOptions();
@ -127,11 +128,27 @@ public class CliFrontend {
} }
private static Configuration getGlobalConfig(CommandLine commandLine) throws Exception { private static Configuration getGlobalConfig(CommandLine commandLine) throws Exception {
// Try to get global config path from command line
String globalConfig = commandLine.getOptionValue(CliFrontendOptions.GLOBAL_CONFIG); String globalConfig = commandLine.getOptionValue(CliFrontendOptions.GLOBAL_CONFIG);
if (globalConfig == null) { if (globalConfig != null) {
return new Configuration(); Path globalConfigPath = Paths.get(globalConfig);
LOG.info("Using global config in command line: {}", globalConfigPath);
return ConfigurationUtils.loadMapFormattedConfig(globalConfigPath);
} }
return ConfigurationUtils.loadMapFormattedConfig(Paths.get(globalConfig));
// Fallback to Flink CDC home
String flinkCdcHome = System.getenv(FLINK_CDC_HOME_ENV_VAR);
if (flinkCdcHome != null) {
Path globalConfigPath =
Paths.get(flinkCdcHome).resolve("conf").resolve("flink-cdc.yaml");
LOG.info("Using global config in FLINK_CDC_HOME: {}", globalConfigPath);
return ConfigurationUtils.loadMapFormattedConfig(globalConfigPath);
}
// Fallback to empty configuration
LOG.warn(
"Cannot find global configuration in command-line or FLINK_CDC_HOME. Will use empty global configuration.");
return new Configuration();
} }
private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) { private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) {

@ -148,7 +148,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
} }
private Configuration toPipelineConfig(JsonNode pipelineConfigNode) { private Configuration toPipelineConfig(JsonNode pipelineConfigNode) {
if (pipelineConfigNode == null) { if (pipelineConfigNode == null || pipelineConfigNode.isNull()) {
return new Configuration(); return new Configuration();
} }
Map<String, String> pipelineConfigMap = Map<String, String> pipelineConfigMap =

@ -44,6 +44,7 @@ FLINK_CONF_DIR=$FLINK_HOME/conf
# Define Flink CDC directories # Define Flink CDC directories
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
FLINK_CDC_HOME="$SCRIPT_DIR"/.. FLINK_CDC_HOME="$SCRIPT_DIR"/..
export FLINK_CDC_HOME=$FLINK_CDC_HOME
FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib
FLINK_CDC_LOG="$FLINK_CDC_HOME"/log FLINK_CDC_LOG="$FLINK_CDC_HOME"/log

@ -1,14 +1,20 @@
<!-- ################################################################################
Copyright 2023 Ververica Inc. # Copyright 2023 Ververica Inc.
#
Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
You may obtain a copy of the License at # You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, # Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an # software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the # KIND, either express or implied. See the License for the
specific language governing permissions and limitations # specific language governing permissions and limitations
under the License. # under the License.
--> ################################################################################
# Parallelism of the pipeline
parallelism: 4
# Behavior for handling schema change events from source
schema.change.behavior: EVOLVE

Loading…
Cancel
Save