[FLINK-36974][cdc-cli] Support overwrite flink configuration via command line

This closes #3823

Co-authored-by: helloliuxg@gmail.com <xiaogenliu@kugou.net>
pull/3824/head^2
hiliuxg 2 weeks ago committed by GitHub
parent 865e14bfd7
commit 6b65aed4c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -20,7 +20,10 @@ package org.apache.flink.cdc.cli;
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.ConfigOptions;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@ -39,8 +42,10 @@ import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.cli.CliFrontendOptions.FLINK_CONFIG;
import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE;
import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION;
@ -91,6 +96,9 @@ public class CliFrontend {
Path flinkHome = getFlinkHome(commandLine);
Configuration flinkConfig = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome);
// To override the Flink configuration
overrideFlinkConfiguration(flinkConfig, commandLine);
// Savepoint
SavepointRestoreSettings savepointSettings = createSavepointRestoreSettings(commandLine);
@ -114,6 +122,25 @@ public class CliFrontend {
savepointSettings);
}
private static void overrideFlinkConfiguration(
Configuration flinkConfig, CommandLine commandLine) {
Properties properties = commandLine.getOptionProperties(FLINK_CONFIG.getOpt());
LOG.info("Dynamic flink config items found: {}", properties);
for (String key : properties.stringPropertyNames()) {
String value = properties.getProperty(key);
if (StringUtils.isNullOrWhitespaceOnly(key)
|| StringUtils.isNullOrWhitespaceOnly(value)) {
throw new IllegalArgumentException(
String.format(
"null or white space argument for key or value: %s=%s",
key, value));
}
ConfigOption<String> configOption =
ConfigOptions.key(key.trim()).stringType().defaultValue(value.trim());
flinkConfig.set(configOption, value.trim());
}
}
private static SavepointRestoreSettings createSavepointRestoreSettings(
CommandLine commandLine) {
if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {

@ -94,6 +94,17 @@ public class CliFrontendOptions {
+ "program that was part of the program when the savepoint was triggered.")
.build();
public static final Option FLINK_CONFIG =
Option.builder("D")
.required(false)
.numberOfArgs(2)
.valueSeparator('=')
.argName("Session dynamic flink config key=val")
.desc(
"Allows specifying multiple flink generic configuration options. The available"
+ "options can be found at https://nightlies.apache.org/flink/flink-docs-stable/ops/config.html")
.build();
public static Options initializeOptions() {
return new Options()
.addOption(HELP)
@ -105,6 +116,7 @@ public class CliFrontendOptions {
.addOption(USE_MINI_CLUSTER)
.addOption(SAVEPOINT_PATH_OPTION)
.addOption(SAVEPOINT_CLAIM_MODE)
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION)
.addOption(FLINK_CONFIG);
}
}

@ -27,6 +27,7 @@ import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -34,6 +35,7 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.nio.file.Paths;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@ -145,6 +147,70 @@ class CliFrontendTest {
assertThat(executionInfo.getDescription()).isEqualTo("fake-description");
}
@Test
void testPipelineExecutingWithFlinkConfig() throws Exception {
// the command line arguments to submit job to exists remote host on yarn session
CliExecutor executor =
createExecutor(
pipelineDef(),
"--flink-home",
flinkHome(),
"--global-config",
globalPipelineConfig(),
"-D",
"execution.target= yarn-session",
"-D",
"rest.bind-port =42689",
"-D",
"yarn.application.id=application_1714009558476_3563",
"-D",
"rest.bind-address=10.1.140.140");
Map<String, String> configMap = executor.getFlinkConfig().toMap();
assertThat(configMap)
.containsEntry("execution.target", "yarn-session")
.containsEntry("rest.bind-port", "42689")
.containsEntry("yarn.application.id", "application_1714009558476_3563")
.containsEntry("rest.bind-address", "10.1.140.140");
}
@Test
void testPipelineExecutingWithUnValidFlinkConfig() throws Exception {
Assertions.assertThatThrownBy(
() ->
createExecutor(
pipelineDef(),
"--flink-home",
flinkHome(),
"-D",
"=execution.target"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
String.format(
"null or white space argument for key or value: %s=%s",
"", "execution.target"));
Assertions.assertThatThrownBy(
() ->
createExecutor(
pipelineDef(),
"--flink-home",
flinkHome(),
"-D",
"execution.target="))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
String.format(
"null or white space argument for key or value: %s=%s",
"execution.target", ""));
Assertions.assertThatThrownBy(
() -> createExecutor(pipelineDef(), "--flink-home", flinkHome(), "-D", "="))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
String.format(
"null or white space argument for key or value: %s=%s", "", ""));
}
private CliExecutor createExecutor(String... args) throws Exception {
Options cliOptions = CliFrontendOptions.initializeOptions();
CommandLineParser parser = new DefaultParser();
@ -168,33 +234,50 @@ class CliFrontendTest {
private static final String HELP_MESSAGE =
"usage:\n"
+ " -cm,--claim-mode <arg> Defines how should we restore from the given\n"
+ " savepoint. Supported options: [claim - claim\n"
+ " ownership of the savepoint and delete once it\n"
+ " is subsumed, no_claim (default) - do not\n"
+ " claim ownership, the first checkpoint will\n"
+ " not reuse any files from the restored one,\n"
+ " legacy - the old behaviour, do not assume\n"
+ " ownership of the savepoint files, but can\n"
+ " reuse some shared files\n"
+ " --flink-home <arg> Path of Flink home directory\n"
+ " --global-config <arg> Path of the global configuration file for\n"
+ " Flink CDC pipelines\n"
+ " -h,--help Display help message\n"
+ " --jar <arg> JARs to be submitted together with the\n"
+ " pipeline\n"
+ " -n,--allow-nonRestored-state Allow to skip savepoint state that cannot be\n"
+ " restored. You need to allow this if you\n"
+ " removed an operator from your program that\n"
+ " was part of the program when the savepoint\n"
+ " was triggered.\n"
+ " -s,--from-savepoint <arg> Path to a savepoint to restore the job from\n"
+ " (for example hdfs:///flink/savepoint-1537\n"
+ " -t,--target <arg> The deployment target for the execution. This\n"
+ " can take one of the following values\n"
+ " local/remote/yarn-session/yarn-application/ku\n"
+ " bernetes-session/kubernetes-application\n"
+ " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n";
+ " -cm,--claim-mode <arg> Defines how should we restore\n"
+ " from the given savepoint.\n"
+ " Supported options: [claim -\n"
+ " claim ownership of the savepoint\n"
+ " and delete once it is subsumed,\n"
+ " no_claim (default) - do not\n"
+ " claim ownership, the first\n"
+ " checkpoint will not reuse any\n"
+ " files from the restored one,\n"
+ " legacy - the old behaviour, do\n"
+ " not assume ownership of the\n"
+ " savepoint files, but can reuse\n"
+ " some shared files\n"
+ " -D <Session dynamic flink config key=val> Allows specifying multiple flink\n"
+ " generic configuration options.\n"
+ " The availableoptions can be\n"
+ " found at\n"
+ " https://nightlies.apache.org/fli\n"
+ " nk/flink-docs-stable/ops/config.\n"
+ " html\n"
+ " --flink-home <arg> Path of Flink home directory\n"
+ " --global-config <arg> Path of the global configuration\n"
+ " file for Flink CDC pipelines\n"
+ " -h,--help Display help message\n"
+ " --jar <arg> JARs to be submitted together\n"
+ " with the pipeline\n"
+ " -n,--allow-nonRestored-state Allow to skip savepoint state\n"
+ " that cannot be restored. You\n"
+ " need to allow this if you\n"
+ " removed an operator from your\n"
+ " program that was part of the\n"
+ " program when the savepoint was\n"
+ " triggered.\n"
+ " -s,--from-savepoint <arg> Path to a savepoint to restore\n"
+ " the job from (for example\n"
+ " hdfs:///flink/savepoint-1537\n"
+ " -t,--target <arg> The deployment target for the\n"
+ " execution. This can take one of\n"
+ " the following values\n"
+ " local/remote/yarn-session/yarn-a\n"
+ " pplication/kubernetes-session/ku\n"
+ " bernetes-application\n"
+ " --use-mini-cluster Use Flink MiniCluster to run the\n"
+ " pipeline\n";
private static class NoOpComposer implements PipelineComposer {

Loading…
Cancel
Save