From 6b65aed4c5da3284214a08082e01b4768aabb67c Mon Sep 17 00:00:00 2001 From: hiliuxg <39675622+hiliuxg@users.noreply.github.com> Date: Tue, 14 Jan 2025 21:37:34 +0800 Subject: [PATCH] [FLINK-36974][cdc-cli] Support overwrite flink configuration via command line This closes #3823 Co-authored-by: helloliuxg@gmail.com --- .../org/apache/flink/cdc/cli/CliFrontend.java | 27 ++++ .../flink/cdc/cli/CliFrontendOptions.java | 14 +- .../apache/flink/cdc/cli/CliFrontendTest.java | 137 ++++++++++++++---- 3 files changed, 150 insertions(+), 28 deletions(-) diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java index 27cb95ec9..40b4061f6 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java @@ -20,7 +20,10 @@ package org.apache.flink.cdc.cli; import org.apache.flink.cdc.cli.utils.ConfigurationUtils; import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils; import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -39,8 +42,10 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.flink.cdc.cli.CliFrontendOptions.FLINK_CONFIG; import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION; import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE; import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION; @@ -91,6 +96,9 @@ public class CliFrontend { Path flinkHome = getFlinkHome(commandLine); Configuration flinkConfig = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome); + // To override the Flink configuration + overrideFlinkConfiguration(flinkConfig, commandLine); + // Savepoint SavepointRestoreSettings savepointSettings = createSavepointRestoreSettings(commandLine); @@ -114,6 +122,25 @@ public class CliFrontend { savepointSettings); } + private static void overrideFlinkConfiguration( + Configuration flinkConfig, CommandLine commandLine) { + Properties properties = commandLine.getOptionProperties(FLINK_CONFIG.getOpt()); + LOG.info("Dynamic flink config items found: {}", properties); + for (String key : properties.stringPropertyNames()) { + String value = properties.getProperty(key); + if (StringUtils.isNullOrWhitespaceOnly(key) + || StringUtils.isNullOrWhitespaceOnly(value)) { + throw new IllegalArgumentException( + String.format( + "null or white space argument for key or value: %s=%s", + key, value)); + } + ConfigOption configOption = + ConfigOptions.key(key.trim()).stringType().defaultValue(value.trim()); + flinkConfig.set(configOption, value.trim()); + } + } + private static SavepointRestoreSettings createSavepointRestoreSettings( CommandLine commandLine) { if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) { diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java index adf39a40b..e14d524f3 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java @@ -94,6 +94,17 @@ public class CliFrontendOptions { + "program that was part of the program when the savepoint was triggered.") .build(); + public static final Option FLINK_CONFIG = + Option.builder("D") + .required(false) + .numberOfArgs(2) + .valueSeparator('=') + .argName("Session dynamic flink config key=val") + .desc( + "Allows specifying multiple flink generic configuration options. The available" + + "options can be found at https://nightlies.apache.org/flink/flink-docs-stable/ops/config.html") + .build(); + public static Options initializeOptions() { return new Options() .addOption(HELP) @@ -105,6 +116,7 @@ public class CliFrontendOptions { .addOption(USE_MINI_CLUSTER) .addOption(SAVEPOINT_PATH_OPTION) .addOption(SAVEPOINT_CLAIM_MODE) - .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION); + .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION) + .addOption(FLINK_CONFIG); } } diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java index 501b28840..426de1196 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java @@ -27,6 +27,7 @@ import org.apache.flink.shaded.guava31.com.google.common.io.Resources; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -34,6 +35,7 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.net.URL; import java.nio.file.Paths; +import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -145,6 +147,70 @@ class CliFrontendTest { assertThat(executionInfo.getDescription()).isEqualTo("fake-description"); } + @Test + void testPipelineExecutingWithFlinkConfig() throws Exception { + // the command line arguments to submit job to exists remote host on yarn session + CliExecutor executor = + createExecutor( + pipelineDef(), + "--flink-home", + flinkHome(), + "--global-config", + globalPipelineConfig(), + "-D", + "execution.target= yarn-session", + "-D", + "rest.bind-port =42689", + "-D", + "yarn.application.id=application_1714009558476_3563", + "-D", + "rest.bind-address=10.1.140.140"); + Map configMap = executor.getFlinkConfig().toMap(); + assertThat(configMap) + .containsEntry("execution.target", "yarn-session") + .containsEntry("rest.bind-port", "42689") + .containsEntry("yarn.application.id", "application_1714009558476_3563") + .containsEntry("rest.bind-address", "10.1.140.140"); + } + + @Test + void testPipelineExecutingWithUnValidFlinkConfig() throws Exception { + Assertions.assertThatThrownBy( + () -> + createExecutor( + pipelineDef(), + "--flink-home", + flinkHome(), + "-D", + "=execution.target")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + String.format( + "null or white space argument for key or value: %s=%s", + "", "execution.target")); + + Assertions.assertThatThrownBy( + () -> + createExecutor( + pipelineDef(), + "--flink-home", + flinkHome(), + "-D", + "execution.target=")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + String.format( + "null or white space argument for key or value: %s=%s", + "execution.target", "")); + + Assertions.assertThatThrownBy( + () -> createExecutor(pipelineDef(), "--flink-home", flinkHome(), "-D", "=")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + String.format( + "null or white space argument for key or value: %s=%s", "", "")); + } + private CliExecutor createExecutor(String... args) throws Exception { Options cliOptions = CliFrontendOptions.initializeOptions(); CommandLineParser parser = new DefaultParser(); @@ -168,33 +234,50 @@ class CliFrontendTest { private static final String HELP_MESSAGE = "usage:\n" - + " -cm,--claim-mode Defines how should we restore from the given\n" - + " savepoint. Supported options: [claim - claim\n" - + " ownership of the savepoint and delete once it\n" - + " is subsumed, no_claim (default) - do not\n" - + " claim ownership, the first checkpoint will\n" - + " not reuse any files from the restored one,\n" - + " legacy - the old behaviour, do not assume\n" - + " ownership of the savepoint files, but can\n" - + " reuse some shared files\n" - + " --flink-home Path of Flink home directory\n" - + " --global-config Path of the global configuration file for\n" - + " Flink CDC pipelines\n" - + " -h,--help Display help message\n" - + " --jar JARs to be submitted together with the\n" - + " pipeline\n" - + " -n,--allow-nonRestored-state Allow to skip savepoint state that cannot be\n" - + " restored. You need to allow this if you\n" - + " removed an operator from your program that\n" - + " was part of the program when the savepoint\n" - + " was triggered.\n" - + " -s,--from-savepoint Path to a savepoint to restore the job from\n" - + " (for example hdfs:///flink/savepoint-1537\n" - + " -t,--target The deployment target for the execution. This\n" - + " can take one of the following values\n" - + " local/remote/yarn-session/yarn-application/ku\n" - + " bernetes-session/kubernetes-application\n" - + " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n"; + + " -cm,--claim-mode Defines how should we restore\n" + + " from the given savepoint.\n" + + " Supported options: [claim -\n" + + " claim ownership of the savepoint\n" + + " and delete once it is subsumed,\n" + + " no_claim (default) - do not\n" + + " claim ownership, the first\n" + + " checkpoint will not reuse any\n" + + " files from the restored one,\n" + + " legacy - the old behaviour, do\n" + + " not assume ownership of the\n" + + " savepoint files, but can reuse\n" + + " some shared files\n" + + " -D Allows specifying multiple flink\n" + + " generic configuration options.\n" + + " The availableoptions can be\n" + + " found at\n" + + " https://nightlies.apache.org/fli\n" + + " nk/flink-docs-stable/ops/config.\n" + + " html\n" + + " --flink-home Path of Flink home directory\n" + + " --global-config Path of the global configuration\n" + + " file for Flink CDC pipelines\n" + + " -h,--help Display help message\n" + + " --jar JARs to be submitted together\n" + + " with the pipeline\n" + + " -n,--allow-nonRestored-state Allow to skip savepoint state\n" + + " that cannot be restored. You\n" + + " need to allow this if you\n" + + " removed an operator from your\n" + + " program that was part of the\n" + + " program when the savepoint was\n" + + " triggered.\n" + + " -s,--from-savepoint Path to a savepoint to restore\n" + + " the job from (for example\n" + + " hdfs:///flink/savepoint-1537\n" + + " -t,--target The deployment target for the\n" + + " execution. This can take one of\n" + + " the following values\n" + + " local/remote/yarn-session/yarn-a\n" + + " pplication/kubernetes-session/ku\n" + + " bernetes-application\n" + + " --use-mini-cluster Use Flink MiniCluster to run the\n" + + " pipeline\n"; private static class NoOpComposer implements PipelineComposer {