[FLINK-36105][flink-cdc-cli] Fix unable to restore from checkpoint with Flink 1.20

This closes #3701.
pull/3743/head
yuxiqian 5 months ago committed by Leonard Xu
parent 1aa89733d8
commit a92dba6530

@ -22,7 +22,6 @@ import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@ -34,6 +33,7 @@ import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
@ -120,18 +120,43 @@ public class CliFrontend {
String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
boolean allowNonRestoredState =
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
final RestoreMode restoreMode;
final Object restoreMode;
if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
restoreMode =
org.apache.flink.configuration.ConfigurationUtils.convertValue(
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
RestoreMode.class);
ConfigurationUtils.getClaimModeClass());
} else {
restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue();
}
// allowNonRestoredState is always false because all operators are predefined.
return SavepointRestoreSettings.forPath(
savepointPath, allowNonRestoredState, restoreMode);
return (SavepointRestoreSettings)
Arrays.stream(SavepointRestoreSettings.class.getMethods())
.filter(
method ->
method.getName().equals("forPath")
&& method.getParameterCount() == 3)
.findFirst()
.map(
method -> {
try {
return method.invoke(
null,
savepointPath,
allowNonRestoredState,
restoreMode);
} catch (IllegalAccessException
| InvocationTargetException e) {
throw new RuntimeException(
"Failed to invoke SavepointRestoreSettings#forPath nethod.",
e);
}
})
.orElseThrow(
() ->
new RuntimeException(
"Failed to resolve SavepointRestoreSettings#forPath method."));
} else {
return SavepointRestoreSettings.none();
}

@ -75,4 +75,16 @@ public class ConfigurationUtils {
&& !target.equalsIgnoreCase(LocalExecutor.NAME)
&& !target.equalsIgnoreCase(RemoteExecutor.NAME);
}
public static Class<?> getClaimModeClass() {
try {
return Class.forName("org.apache.flink.core.execution.RestoreMode");
} catch (ClassNotFoundException ignored) {
try {
return Class.forName("org.apache.flink.runtime.jobgraph.RestoreMode");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
}

Loading…
Cancel
Save