From b10a666e3ebfc1a8fd0ac50167bee5cbf39bf84a Mon Sep 17 00:00:00 2001 From: Kunni Date: Tue, 9 Apr 2024 15:25:42 +0800 Subject: [PATCH] [FLINK-34613][cdc] Support recover from a specific savepoint file (#2959) --- .../legacy-flink-cdc-sources/mysql-cdc.md | 2 +- .../legacy-flink-cdc-sources/mysql-cdc.md | 2 +- .../org/apache/flink/cdc/cli/CliExecutor.java | 17 +++++-- .../org/apache/flink/cdc/cli/CliFrontend.java | 36 +++++++++++++- .../flink/cdc/cli/CliFrontendOptions.java | 36 +++++++++++++- .../cdc/cli/utils/FlinkEnvironmentUtils.java | 13 +++-- .../apache/flink/cdc/cli/CliFrontendTest.java | 49 ++++++++++++++++--- 7 files changed, 137 insertions(+), 18 deletions(-) diff --git a/docs/content.zh/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md index 46cc44d1b..ccae86cfa 100644 --- a/docs/content.zh/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md @@ -717,7 +717,7 @@ _Step 3_: 从 savepoint 还原更新后的 Flink 作业。 ```shell $ ./bin/flink run \ --detached \ - --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \ + --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \ ./FlinkCDCExample.jar ``` **注意:** 请参考文档 [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) 了解更多详细信息。 diff --git a/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md b/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md index bc87a43e4..6937742e1 100644 --- a/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/legacy-flink-cdc-sources/mysql-cdc.md @@ -749,7 +749,7 @@ _Step 3_: Restore the updated Flink job from savepoint. ```shell $ ./bin/flink run \ --detached \ - --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \ + --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \ ./FlinkCDCExample.jar ``` **Note:** Please refer the doc [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) for more details. diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java index febe8b331..3e76607b2 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import java.nio.file.Path; import java.util.List; @@ -40,17 +41,21 @@ public class CliExecutor { private PipelineComposer composer = null; + private final SavepointRestoreSettings savepointSettings; + public CliExecutor( Path pipelineDefPath, Configuration flinkConfig, Configuration globalPipelineConfig, boolean useMiniCluster, - List additionalJars) { + List additionalJars, + SavepointRestoreSettings savepointSettings) { this.pipelineDefPath = pipelineDefPath; this.flinkConfig = flinkConfig; this.globalPipelineConfig = globalPipelineConfig; this.useMiniCluster = useMiniCluster; this.additionalJars = additionalJars; + this.savepointSettings = savepointSettings; } public PipelineExecution.ExecutionInfo run() throws Exception { @@ -60,7 +65,7 @@ public class CliExecutor { pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig); // Create composer - PipelineComposer composer = getComposer(flinkConfig); + PipelineComposer composer = getComposer(); // Compose pipeline PipelineExecution execution = composer.compose(pipelineDef); @@ -69,10 +74,10 @@ public class CliExecutor { return execution.execute(); } - private PipelineComposer getComposer(Configuration flinkConfig) { + private PipelineComposer getComposer() { if (composer == null) { return FlinkEnvironmentUtils.createComposer( - useMiniCluster, flinkConfig, additionalJars); + useMiniCluster, flinkConfig, additionalJars, savepointSettings); } return composer; } @@ -96,4 +101,8 @@ public class CliExecutor { public List getAdditionalJars() { return additionalJars; } + + public SavepointRestoreSettings getSavepointSettings() { + return savepointSettings; + } } diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java index cd54333d5..663f03942 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java @@ -22,6 +22,9 @@ import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.runtime.jobgraph.RestoreMode; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -40,6 +43,10 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION; +import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE; +import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION; + /** The frontend entrypoint for the command-line interface of Flink CDC. */ public class CliFrontend { private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); @@ -90,6 +97,9 @@ public class CliFrontend { Path flinkHome = getFlinkHome(commandLine); Configuration flinkConfig = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome); + // Savepoint + SavepointRestoreSettings savepointSettings = createSavepointRestoreSettings(commandLine); + // Additional JARs List additionalJars = Arrays.stream( @@ -105,7 +115,31 @@ public class CliFrontend { flinkConfig, globalPipelineConfig, commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER), - additionalJars); + additionalJars, + savepointSettings); + } + + private static SavepointRestoreSettings createSavepointRestoreSettings( + CommandLine commandLine) { + if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) { + String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt()); + boolean allowNonRestoredState = + commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt()); + final RestoreMode restoreMode; + if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) { + restoreMode = + org.apache.flink.configuration.ConfigurationUtils.convertValue( + commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE), + RestoreMode.class); + } else { + restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue(); + } + // allowNonRestoredState is always false because all operators are predefined. + return SavepointRestoreSettings.forPath( + savepointPath, allowNonRestoredState, restoreMode); + } else { + return SavepointRestoreSettings.none(); + } } private static Path getFlinkHome(CommandLine commandLine) { diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java index fd3507d52..320213285 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java @@ -53,12 +53,46 @@ public class CliFrontendOptions { .desc("Use Flink MiniCluster to run the pipeline") .build(); + public static final Option SAVEPOINT_PATH_OPTION = + Option.builder("s") + .longOpt("from-savepoint") + .hasArg(true) + .desc( + "Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537") + .build(); + + public static final Option SAVEPOINT_CLAIM_MODE = + Option.builder("cm") + .longOpt("claim-mode") + .hasArg(true) + .desc( + "Defines how should we restore from the given savepoint. Supported options: " + + "[claim - claim ownership of the savepoint and delete once it is" + + " subsumed, no_claim (default) - do not claim ownership, the first" + + " checkpoint will not reuse any files from the restored one, legacy " + + "- the old behaviour, do not assume ownership of the savepoint files," + + " but can reuse some shared files") + .build(); + + public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = + Option.builder("n") + .longOpt("allow-nonRestored-state") + .hasArg(false) + .desc( + "Allow to skip savepoint state that cannot be restored. " + + "You need to allow this if you removed an operator from your " + + "program that was part of the program when the savepoint was triggered.") + .build(); + public static Options initializeOptions() { return new Options() .addOption(HELP) .addOption(JAR) .addOption(FLINK_HOME) .addOption(GLOBAL_CONFIG) - .addOption(USE_MINI_CLUSTER); + .addOption(USE_MINI_CLUSTER) + .addOption(SAVEPOINT_PATH_OPTION) + .addOption(SAVEPOINT_CLAIM_MODE) + .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION); } } diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java index 15226a18f..4880fa47b 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.cdc.cli.utils; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import java.nio.file.Path; import java.util.List; @@ -35,12 +36,16 @@ public class FlinkEnvironmentUtils { } public static FlinkPipelineComposer createComposer( - boolean useMiniCluster, Configuration flinkConfig, List additionalJars) { + boolean useMiniCluster, + Configuration flinkConfig, + List additionalJars, + SavepointRestoreSettings savepointSettings) { if (useMiniCluster) { return FlinkPipelineComposer.ofMiniCluster(); } - return FlinkPipelineComposer.ofRemoteCluster( - org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()), - additionalJars); + org.apache.flink.configuration.Configuration configuration = + org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()); + SavepointRestoreSettings.toConfiguration(savepointSettings, configuration); + return FlinkPipelineComposer.ofRemoteCluster(configuration, additionalJars); } } diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java index 1f8a37236..32e250871 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.cli; import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.shaded.guava31.com.google.common.io.Resources; @@ -85,6 +86,25 @@ class CliFrontendTest { assertThat(executor.getGlobalPipelineConfig().toMap().get("foo")).isEqualTo("bar"); } + @Test + void testSavePointConfiguration() throws Exception { + CliExecutor executor = + createExecutor( + pipelineDef(), + "--flink-home", + flinkHome(), + "-s", + flinkHome() + "/savepoints/savepoint-1", + "-cm", + "no_claim", + "-n"); + assertThat(executor.getSavepointSettings().getRestorePath()) + .isEqualTo(flinkHome() + "/savepoints/savepoint-1"); + assertThat(executor.getSavepointSettings().getRestoreMode()) + .isEqualTo(RestoreMode.NO_CLAIM); + assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue(); + } + @Test void testAdditionalJar() throws Exception { String aJar = "/foo/jar/a.jar"; @@ -134,12 +154,29 @@ class CliFrontendTest { private static final String HELP_MESSAGE = "usage:\n" - + " --flink-home Path of Flink home directory\n" - + " --global-config Path of the global configuration file for Flink\n" - + " CDC pipelines\n" - + " -h,--help Display help message\n" - + " --jar JARs to be submitted together with the pipeline\n" - + " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n"; + + " -cm,--claim-mode Defines how should we restore from the given\n" + + " savepoint. Supported options: [claim - claim\n" + + " ownership of the savepoint and delete once it\n" + + " is subsumed, no_claim (default) - do not\n" + + " claim ownership, the first checkpoint will\n" + + " not reuse any files from the restored one,\n" + + " legacy - the old behaviour, do not assume\n" + + " ownership of the savepoint files, but can\n" + + " reuse some shared files\n" + + " --flink-home Path of Flink home directory\n" + + " --global-config Path of the global configuration file for\n" + + " Flink CDC pipelines\n" + + " -h,--help Display help message\n" + + " --jar JARs to be submitted together with the\n" + + " pipeline\n" + + " -n,--allow-nonRestored-state Allow to skip savepoint state that cannot be\n" + + " restored. You need to allow this if you\n" + + " removed an operator from your program that\n" + + " was part of the program when the savepoint\n" + + " was triggered.\n" + + " -s,--from-savepoint Path to a savepoint to restore the job from\n" + + " (for example hdfs:///flink/savepoint-1537\n" + + " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n"; private static class NoOpComposer implements PipelineComposer {