From 5d58769fe8058242123614cd9729d6660f8f4d81 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 7 Nov 2024 18:13:42 +0800 Subject: [PATCH] [FLINK-36105][flink-cdc-cli] Fix unable to restore from checkpoint with Flink 1.20 (#3564) --- .../workflows/flink_cdc_migration_test.yml | 3 +- .../org/apache/flink/cdc/cli/CliFrontend.java | 35 ++++++++++++++++--- .../cdc/cli/utils/ConfigurationUtils.java | 12 +++++++ 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/.github/workflows/flink_cdc_migration_test.yml b/.github/workflows/flink_cdc_migration_test.yml index db5aa86d3..704942560 100644 --- a/.github/workflows/flink_cdc_migration_test.yml +++ b/.github/workflows/flink_cdc_migration_test.yml @@ -39,8 +39,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - # '1.20.0' is excluded since FLINK-36105 has not been merged. - flink-version: [ '1.18.1', '1.19.1' ] + flink-version: [ '1.18.1', '1.19.1', '1.20.0' ] steps: - uses: actions/checkout@v4 diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java index ac746e224..27cb95ec9 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java @@ -22,7 +22,6 @@ import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.composer.PipelineExecution; -import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -34,6 +33,7 @@ import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.InvocationTargetException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; @@ -120,18 +120,43 @@ public class CliFrontend { String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt()); boolean allowNonRestoredState = commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt()); - final RestoreMode restoreMode; + final Object restoreMode; if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) { restoreMode = org.apache.flink.configuration.ConfigurationUtils.convertValue( commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE), - RestoreMode.class); + ConfigurationUtils.getClaimModeClass()); } else { restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue(); } // allowNonRestoredState is always false because all operators are predefined. - return SavepointRestoreSettings.forPath( - savepointPath, allowNonRestoredState, restoreMode); + + return (SavepointRestoreSettings) + Arrays.stream(SavepointRestoreSettings.class.getMethods()) + .filter( + method -> + method.getName().equals("forPath") + && method.getParameterCount() == 3) + .findFirst() + .map( + method -> { + try { + return method.invoke( + null, + savepointPath, + allowNonRestoredState, + restoreMode); + } catch (IllegalAccessException + | InvocationTargetException e) { + throw new RuntimeException( + "Failed to invoke SavepointRestoreSettings#forPath nethod.", + e); + } + }) + .orElseThrow( + () -> + new RuntimeException( + "Failed to resolve SavepointRestoreSettings#forPath method.")); } else { return SavepointRestoreSettings.none(); } diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java index 6d2100aa5..af1bc7318 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java @@ -75,4 +75,16 @@ public class ConfigurationUtils { && !target.equalsIgnoreCase(LocalExecutor.NAME) && !target.equalsIgnoreCase(RemoteExecutor.NAME); } + + public static Class getClaimModeClass() { + try { + return Class.forName("org.apache.flink.core.execution.RestoreMode"); + } catch (ClassNotFoundException ignored) { + try { + return Class.forName("org.apache.flink.runtime.jobgraph.RestoreMode"); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + } }