From de4be372f8d553d3bb1610305a282a9be53634e3 Mon Sep 17 00:00:00 2001
From: yux <34335406+yuxiqian@users.noreply.github.com>
Date: Wed, 24 Apr 2024 14:03:22 +0800
Subject: [PATCH] [hotfix][e2e] Fix pipeline e2e test cases failures (#3246)
---
docs/content.zh/docs/connectors/mysql.md | 2 +-
docs/content/docs/connectors/mysql.md | 2 +-
.../cdc/cli/utils/ConfigurationUtils.java | 8 +++-
.../cdc/cli/utils/FlinkEnvironmentUtils.java | 2 +-
.../flink/cdc/cli/utils/YamlParserUtils.java | 38 +++++++++++++++++--
.../cdc/cli/utils/ConfigurationUtilsTest.java | 3 +-
.../resources/flink-home/conf/flink-conf.yaml | 3 ++
.../cdc/pipeline/tests/MysqlE2eITCase.java | 3 +-
.../tests/utils/PipelineTestEnvironment.java | 7 +++-
9 files changed, 58 insertions(+), 10 deletions(-)
diff --git a/docs/content.zh/docs/connectors/mysql.md b/docs/content.zh/docs/connectors/mysql.md
index 0fccd82f1..99b573df6 100644
--- a/docs/content.zh/docs/connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/mysql.md
@@ -31,7 +31,7 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
## 依赖配置
由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。
-您可能需要手动配置以下依赖:
+您可能需要手动配置以下依赖,并在提交 YAML 作业时使用 Flink CDC CLI 的 `--jar` 参数将其传入:
diff --git a/docs/content/docs/connectors/mysql.md b/docs/content/docs/connectors/mysql.md
index d2878faba..a314daf45 100644
--- a/docs/content/docs/connectors/mysql.md
+++ b/docs/content/docs/connectors/mysql.md
@@ -32,7 +32,7 @@ This document describes how to setup the MySQL connector.
## Dependencies
Since MySQL Connector's GPLv2 license is incompatible with Flink CDC project, we can't provide MySQL connector in prebuilt connector jar packages.
-You may need to configure the following dependencies manually.
+You may need to configure the following dependencies manually, and pass it with `--jar` argument of Flink CDC CLI when submitting YAML pipeline jobs.
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
index 38fa608f5..8bc4ba628 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java
@@ -30,7 +30,13 @@ public class ConfigurationUtils {
private static final String KEY_SEPARATOR = ".";
public static Configuration loadConfigFile(Path configPath) throws Exception {
- Map configMap = YamlParserUtils.loadYamlFile(configPath.toFile());
+ return loadConfigFile(configPath, false);
+ }
+
+ public static Configuration loadConfigFile(Path configPath, boolean allowDuplicateKeys)
+ throws Exception {
+ Map configMap =
+ YamlParserUtils.loadYamlFile(configPath.toFile(), allowDuplicateKeys);
return Configuration.fromMap(flattenConfigMap(configMap, ""));
}
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
index d250ea835..7c4a7acea 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
@@ -46,7 +46,7 @@ public class FlinkEnvironmentUtils {
FLINK_CONF_FILENAME,
LEGACY_FLINK_CONF_FILENAME);
return ConfigurationUtils.loadConfigFile(
- flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME));
+ flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME), true);
}
}
diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/YamlParserUtils.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/YamlParserUtils.java
index d47332b01..1e5889c4b 100644
--- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/YamlParserUtils.java
+++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/YamlParserUtils.java
@@ -82,7 +82,22 @@ public class YamlParserUtils {
new Dump(flowDumperSettings, new FlinkConfigRepresenter(flowDumperSettings));
private static final Load loader =
- new Load(LoadSettings.builder().setSchema(new CoreSchema()).build());
+ new Load(
+ LoadSettings.builder()
+ .setSchema(new CoreSchema())
+ .setAllowDuplicateKeys(false)
+ .build());
+
+ private static final Load legacyLoader =
+ new Load(
+ LoadSettings.builder()
+ .setSchema(new CoreSchema())
+ .setAllowDuplicateKeys(true)
+ .build());
+
+ private static Load getYamlLoader(boolean allowDuplicateKeys) {
+ return allowDuplicateKeys ? legacyLoader : loader;
+ }
/**
* Loads the contents of the given YAML file into a map.
@@ -94,12 +109,29 @@ public class YamlParserUtils {
* @throws YamlEngineException if the file cannot be parsed.
* @throws IOException if an I/O error occurs while reading from the file stream.
*/
- @SuppressWarnings("unchecked")
public static synchronized @Nonnull Map loadYamlFile(File file)
throws Exception {
+ return loadYamlFile(file, false);
+ }
+
+ /**
+ * Loads the contents of the given YAML file into a map.
+ *
+ * @param file the YAML file to load.
+ * @param allowDuplicateKeys whether to allow duplicated keys.
+ * @return a non-null map representing the YAML content. If the file is empty or only contains
+ * comments, an empty map is returned.
+ * @throws FileNotFoundException if the YAML file is not found.
+ * @throws YamlEngineException if the file cannot be parsed.
+ * @throws IOException if an I/O error occurs while reading from the file stream.
+ */
+ @SuppressWarnings("unchecked")
+ public static synchronized @Nonnull Map loadYamlFile(
+ File file, boolean allowDuplicateKeys) throws Exception {
try (FileInputStream inputStream = new FileInputStream((file))) {
Map yamlResult =
- (Map) loader.loadFromInputStream(inputStream);
+ (Map)
+ getYamlLoader(allowDuplicateKeys).loadFromInputStream(inputStream);
return yamlResult == null ? new HashMap<>() : yamlResult;
} catch (FileNotFoundException e) {
LOG.error("Failed to find YAML file", e);
diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
index 7e012fce9..b12681190 100644
--- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
+++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/utils/ConfigurationUtilsTest.java
@@ -75,7 +75,8 @@ class ConfigurationUtilsTest {
void loadConfigFile(String resourcePath) throws Exception {
URL resource = Resources.getResource(resourcePath);
Path path = Paths.get(resource.toURI());
- Configuration configuration = ConfigurationUtils.loadConfigFile(path);
+ Configuration configuration =
+ ConfigurationUtils.loadConfigFile(path, resourcePath.endsWith("flink-conf.yaml"));
Map configMap = configuration.toMap();
for (Map.Entry, Object> entry : CONFIG_OPTIONS.entrySet()) {
String key = entry.getKey().key();
diff --git a/flink-cdc-cli/src/test/resources/flink-home/conf/flink-conf.yaml b/flink-cdc-cli/src/test/resources/flink-home/conf/flink-conf.yaml
index 27d1f712e..3d25a04bd 100644
--- a/flink-cdc-cli/src/test/resources/flink-home/conf/flink-conf.yaml
+++ b/flink-cdc-cli/src/test/resources/flink-home/conf/flink-conf.yaml
@@ -33,6 +33,9 @@ env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-export
# automatically configure the host name based on the hostname of the node where the
# JobManager runs.
+# Legacy flink-conf.yaml allows duplicate keys in yaml.
+# This key is meant to check if the yaml parser is able to handle duplicate keys.
+jobmanager.rpc.address: shaded
jobmanager.rpc.address: localhost
# The RPC port where the JobManager is reachable.
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 4f0d9e002..330717586 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -110,7 +110,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
mysqlInventoryDatabase.getDatabaseName());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
- submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar);
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 72dff1818..a448bf554 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -174,8 +174,13 @@ public abstract class PipelineTestEnvironment extends TestLogger {
Files.write(script, pipelineJob.getBytes());
jobManager.copyFileToContainer(
MountableFile.forHostPath(script), "/tmp/flinkCDC/conf/pipeline.yaml");
+ StringBuilder sb = new StringBuilder();
+ for (Path jar : jars) {
+ sb.append(" --jar /tmp/flinkCDC/lib/").append(jar.getFileName());
+ }
String commands =
- "/tmp/flinkCDC/bin/flink-cdc.sh /tmp/flinkCDC/conf/pipeline.yaml --flink-home /opt/flink";
+ "/tmp/flinkCDC/bin/flink-cdc.sh /tmp/flinkCDC/conf/pipeline.yaml --flink-home /opt/flink"
+ + sb;
ExecResult execResult = jobManager.execInContainer("bash", "-c", commands);
LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr());