[FLINK-34613][cdc] Support recover from a specific savepoint file (#2959)

pull/3129/head
Kunni 10 months ago committed by GitHub
parent 6510e670fa
commit b10a666e3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -717,7 +717,7 @@ _Step 3_: 从 savepoint 还原更新后的 Flink 作业。
```shell
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
--from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./FlinkCDCExample.jar
```
**注意:** 请参考文档 [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) 了解更多详细信息。

@ -749,7 +749,7 @@ _Step 3_: Restore the updated Flink job from savepoint.
```shell
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
--from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./FlinkCDCExample.jar
```
**Note:** Please refer the doc [Restore the job from previous savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface) for more details.

@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import java.nio.file.Path;
import java.util.List;
@ -40,17 +41,21 @@ public class CliExecutor {
private PipelineComposer composer = null;
private final SavepointRestoreSettings savepointSettings;
public CliExecutor(
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List<Path> additionalJars) {
List<Path> additionalJars,
SavepointRestoreSettings savepointSettings) {
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
this.useMiniCluster = useMiniCluster;
this.additionalJars = additionalJars;
this.savepointSettings = savepointSettings;
}
public PipelineExecution.ExecutionInfo run() throws Exception {
@ -60,7 +65,7 @@ public class CliExecutor {
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer(flinkConfig);
PipelineComposer composer = getComposer();
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
@ -69,10 +74,10 @@ public class CliExecutor {
return execution.execute();
}
private PipelineComposer getComposer(Configuration flinkConfig) {
private PipelineComposer getComposer() {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars);
useMiniCluster, flinkConfig, additionalJars, savepointSettings);
}
return composer;
}
@ -96,4 +101,8 @@ public class CliExecutor {
public List<Path> getAdditionalJars() {
return additionalJars;
}
public SavepointRestoreSettings getSavepointSettings() {
return savepointSettings;
}
}

@ -22,6 +22,9 @@ import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@ -40,6 +43,10 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE;
import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION;
/** The frontend entrypoint for the command-line interface of Flink CDC. */
public class CliFrontend {
private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
@ -90,6 +97,9 @@ public class CliFrontend {
Path flinkHome = getFlinkHome(commandLine);
Configuration flinkConfig = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome);
// Savepoint
SavepointRestoreSettings savepointSettings = createSavepointRestoreSettings(commandLine);
// Additional JARs
List<Path> additionalJars =
Arrays.stream(
@ -105,7 +115,31 @@ public class CliFrontend {
flinkConfig,
globalPipelineConfig,
commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER),
additionalJars);
additionalJars,
savepointSettings);
}
private static SavepointRestoreSettings createSavepointRestoreSettings(
CommandLine commandLine) {
if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
boolean allowNonRestoredState =
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
final RestoreMode restoreMode;
if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
restoreMode =
org.apache.flink.configuration.ConfigurationUtils.convertValue(
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
RestoreMode.class);
} else {
restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue();
}
// allowNonRestoredState is always false because all operators are predefined.
return SavepointRestoreSettings.forPath(
savepointPath, allowNonRestoredState, restoreMode);
} else {
return SavepointRestoreSettings.none();
}
}
private static Path getFlinkHome(CommandLine commandLine) {

@ -53,12 +53,46 @@ public class CliFrontendOptions {
.desc("Use Flink MiniCluster to run the pipeline")
.build();
public static final Option SAVEPOINT_PATH_OPTION =
Option.builder("s")
.longOpt("from-savepoint")
.hasArg(true)
.desc(
"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537")
.build();
public static final Option SAVEPOINT_CLAIM_MODE =
Option.builder("cm")
.longOpt("claim-mode")
.hasArg(true)
.desc(
"Defines how should we restore from the given savepoint. Supported options: "
+ "[claim - claim ownership of the savepoint and delete once it is"
+ " subsumed, no_claim (default) - do not claim ownership, the first"
+ " checkpoint will not reuse any files from the restored one, legacy "
+ "- the old behaviour, do not assume ownership of the savepoint files,"
+ " but can reuse some shared files")
.build();
public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION =
Option.builder("n")
.longOpt("allow-nonRestored-state")
.hasArg(false)
.desc(
"Allow to skip savepoint state that cannot be restored. "
+ "You need to allow this if you removed an operator from your "
+ "program that was part of the program when the savepoint was triggered.")
.build();
public static Options initializeOptions() {
return new Options()
.addOption(HELP)
.addOption(JAR)
.addOption(FLINK_HOME)
.addOption(GLOBAL_CONFIG)
.addOption(USE_MINI_CLUSTER);
.addOption(USE_MINI_CLUSTER)
.addOption(SAVEPOINT_PATH_OPTION)
.addOption(SAVEPOINT_CLAIM_MODE)
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
}
}

@ -19,6 +19,7 @@ package org.apache.flink.cdc.cli.utils;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import java.nio.file.Path;
import java.util.List;
@ -35,12 +36,16 @@ public class FlinkEnvironmentUtils {
}
public static FlinkPipelineComposer createComposer(
boolean useMiniCluster, Configuration flinkConfig, List<Path> additionalJars) {
boolean useMiniCluster,
Configuration flinkConfig,
List<Path> additionalJars,
SavepointRestoreSettings savepointSettings) {
if (useMiniCluster) {
return FlinkPipelineComposer.ofMiniCluster();
}
return FlinkPipelineComposer.ofRemoteCluster(
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars);
org.apache.flink.configuration.Configuration configuration =
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap());
SavepointRestoreSettings.toConfiguration(savepointSettings, configuration);
return FlinkPipelineComposer.ofRemoteCluster(configuration, additionalJars);
}
}

@ -20,6 +20,7 @@ package org.apache.flink.cdc.cli;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
@ -85,6 +86,25 @@ class CliFrontendTest {
assertThat(executor.getGlobalPipelineConfig().toMap().get("foo")).isEqualTo("bar");
}
@Test
void testSavePointConfiguration() throws Exception {
CliExecutor executor =
createExecutor(
pipelineDef(),
"--flink-home",
flinkHome(),
"-s",
flinkHome() + "/savepoints/savepoint-1",
"-cm",
"no_claim",
"-n");
assertThat(executor.getSavepointSettings().getRestorePath())
.isEqualTo(flinkHome() + "/savepoints/savepoint-1");
assertThat(executor.getSavepointSettings().getRestoreMode())
.isEqualTo(RestoreMode.NO_CLAIM);
assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue();
}
@Test
void testAdditionalJar() throws Exception {
String aJar = "/foo/jar/a.jar";
@ -134,12 +154,29 @@ class CliFrontendTest {
private static final String HELP_MESSAGE =
"usage:\n"
+ " --flink-home <arg> Path of Flink home directory\n"
+ " --global-config <arg> Path of the global configuration file for Flink\n"
+ " CDC pipelines\n"
+ " -h,--help Display help message\n"
+ " --jar <arg> JARs to be submitted together with the pipeline\n"
+ " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n";
+ " -cm,--claim-mode <arg> Defines how should we restore from the given\n"
+ " savepoint. Supported options: [claim - claim\n"
+ " ownership of the savepoint and delete once it\n"
+ " is subsumed, no_claim (default) - do not\n"
+ " claim ownership, the first checkpoint will\n"
+ " not reuse any files from the restored one,\n"
+ " legacy - the old behaviour, do not assume\n"
+ " ownership of the savepoint files, but can\n"
+ " reuse some shared files\n"
+ " --flink-home <arg> Path of Flink home directory\n"
+ " --global-config <arg> Path of the global configuration file for\n"
+ " Flink CDC pipelines\n"
+ " -h,--help Display help message\n"
+ " --jar <arg> JARs to be submitted together with the\n"
+ " pipeline\n"
+ " -n,--allow-nonRestored-state Allow to skip savepoint state that cannot be\n"
+ " restored. You need to allow this if you\n"
+ " removed an operator from your program that\n"
+ " was part of the program when the savepoint\n"
+ " was triggered.\n"
+ " -s,--from-savepoint <arg> Path to a savepoint to restore the job from\n"
+ " (for example hdfs:///flink/savepoint-1537\n"
+ " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n";
private static class NoOpComposer implements PipelineComposer {

Loading…
Cancel
Save