[hotfix][e2e] Fix pipeline e2e test cases failures (#3246)

pull/3124/head^2
yux 9 months ago committed by GitHub
parent ef2eece256
commit de4be372f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -31,7 +31,7 @@ MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量
## 依赖配置 ## 依赖配置
由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。 由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。
您可能需要手动配置以下依赖: 您可能需要手动配置以下依赖,并在提交 YAML 作业时使用 Flink CDC CLI 的 `--jar` 参数将其传入
<div class="wy-table-responsive"> <div class="wy-table-responsive">
<table class="colwidths-auto docutils"> <table class="colwidths-auto docutils">

@ -32,7 +32,7 @@ This document describes how to setup the MySQL connector.
## Dependencies ## Dependencies
Since MySQL Connector's GPLv2 license is incompatible with Flink CDC project, we can't provide MySQL connector in prebuilt connector jar packages. Since MySQL Connector's GPLv2 license is incompatible with Flink CDC project, we can't provide MySQL connector in prebuilt connector jar packages.
You may need to configure the following dependencies manually. You may need to configure the following dependencies manually, and pass it with `--jar` argument of Flink CDC CLI when submitting YAML pipeline jobs.
<div class="wy-table-responsive"> <div class="wy-table-responsive">
<table class="colwidths-auto docutils"> <table class="colwidths-auto docutils">

@ -30,7 +30,13 @@ public class ConfigurationUtils {
private static final String KEY_SEPARATOR = "."; private static final String KEY_SEPARATOR = ".";
public static Configuration loadConfigFile(Path configPath) throws Exception { public static Configuration loadConfigFile(Path configPath) throws Exception {
Map<String, Object> configMap = YamlParserUtils.loadYamlFile(configPath.toFile()); return loadConfigFile(configPath, false);
}
public static Configuration loadConfigFile(Path configPath, boolean allowDuplicateKeys)
throws Exception {
Map<String, Object> configMap =
YamlParserUtils.loadYamlFile(configPath.toFile(), allowDuplicateKeys);
return Configuration.fromMap(flattenConfigMap(configMap, "")); return Configuration.fromMap(flattenConfigMap(configMap, ""));
} }

@ -46,7 +46,7 @@ public class FlinkEnvironmentUtils {
FLINK_CONF_FILENAME, FLINK_CONF_FILENAME,
LEGACY_FLINK_CONF_FILENAME); LEGACY_FLINK_CONF_FILENAME);
return ConfigurationUtils.loadConfigFile( return ConfigurationUtils.loadConfigFile(
flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME)); flinkHome.resolve(FLINK_CONF_DIR).resolve(LEGACY_FLINK_CONF_FILENAME), true);
} }
} }

@ -82,7 +82,22 @@ public class YamlParserUtils {
new Dump(flowDumperSettings, new FlinkConfigRepresenter(flowDumperSettings)); new Dump(flowDumperSettings, new FlinkConfigRepresenter(flowDumperSettings));
private static final Load loader = private static final Load loader =
new Load(LoadSettings.builder().setSchema(new CoreSchema()).build()); new Load(
LoadSettings.builder()
.setSchema(new CoreSchema())
.setAllowDuplicateKeys(false)
.build());
private static final Load legacyLoader =
new Load(
LoadSettings.builder()
.setSchema(new CoreSchema())
.setAllowDuplicateKeys(true)
.build());
private static Load getYamlLoader(boolean allowDuplicateKeys) {
return allowDuplicateKeys ? legacyLoader : loader;
}
/** /**
* Loads the contents of the given YAML file into a map. * Loads the contents of the given YAML file into a map.
@ -94,12 +109,29 @@ public class YamlParserUtils {
* @throws YamlEngineException if the file cannot be parsed. * @throws YamlEngineException if the file cannot be parsed.
* @throws IOException if an I/O error occurs while reading from the file stream. * @throws IOException if an I/O error occurs while reading from the file stream.
*/ */
@SuppressWarnings("unchecked")
public static synchronized @Nonnull Map<String, Object> loadYamlFile(File file) public static synchronized @Nonnull Map<String, Object> loadYamlFile(File file)
throws Exception { throws Exception {
return loadYamlFile(file, false);
}
/**
* Loads the contents of the given YAML file into a map.
*
* @param file the YAML file to load.
* @param allowDuplicateKeys whether to allow duplicated keys.
* @return a non-null map representing the YAML content. If the file is empty or only contains
* comments, an empty map is returned.
* @throws FileNotFoundException if the YAML file is not found.
* @throws YamlEngineException if the file cannot be parsed.
* @throws IOException if an I/O error occurs while reading from the file stream.
*/
@SuppressWarnings("unchecked")
public static synchronized @Nonnull Map<String, Object> loadYamlFile(
File file, boolean allowDuplicateKeys) throws Exception {
try (FileInputStream inputStream = new FileInputStream((file))) { try (FileInputStream inputStream = new FileInputStream((file))) {
Map<String, Object> yamlResult = Map<String, Object> yamlResult =
(Map<String, Object>) loader.loadFromInputStream(inputStream); (Map<String, Object>)
getYamlLoader(allowDuplicateKeys).loadFromInputStream(inputStream);
return yamlResult == null ? new HashMap<>() : yamlResult; return yamlResult == null ? new HashMap<>() : yamlResult;
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
LOG.error("Failed to find YAML file", e); LOG.error("Failed to find YAML file", e);

@ -75,7 +75,8 @@ class ConfigurationUtilsTest {
void loadConfigFile(String resourcePath) throws Exception { void loadConfigFile(String resourcePath) throws Exception {
URL resource = Resources.getResource(resourcePath); URL resource = Resources.getResource(resourcePath);
Path path = Paths.get(resource.toURI()); Path path = Paths.get(resource.toURI());
Configuration configuration = ConfigurationUtils.loadConfigFile(path); Configuration configuration =
ConfigurationUtils.loadConfigFile(path, resourcePath.endsWith("flink-conf.yaml"));
Map<String, String> configMap = configuration.toMap(); Map<String, String> configMap = configuration.toMap();
for (Map.Entry<ConfigOption<?>, Object> entry : CONFIG_OPTIONS.entrySet()) { for (Map.Entry<ConfigOption<?>, Object> entry : CONFIG_OPTIONS.entrySet()) {
String key = entry.getKey().key(); String key = entry.getKey().key();

@ -33,6 +33,9 @@ env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-export
# automatically configure the host name based on the hostname of the node where the # automatically configure the host name based on the hostname of the node where the
# JobManager runs. # JobManager runs.
# Legacy flink-conf.yaml allows duplicate keys in yaml.
# This key is meant to check if the yaml parser is able to handle duplicate keys.
jobmanager.rpc.address: shaded
jobmanager.rpc.address: localhost jobmanager.rpc.address: localhost
# The RPC port where the JobManager is reachable. # The RPC port where the JobManager is reachable.

@ -110,7 +110,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
mysqlInventoryDatabase.getDatabaseName()); mysqlInventoryDatabase.getDatabaseName());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30)); waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running"); LOG.info("Pipeline job is running");
waitUtilSpecificEvent( waitUtilSpecificEvent(

@ -174,8 +174,13 @@ public abstract class PipelineTestEnvironment extends TestLogger {
Files.write(script, pipelineJob.getBytes()); Files.write(script, pipelineJob.getBytes());
jobManager.copyFileToContainer( jobManager.copyFileToContainer(
MountableFile.forHostPath(script), "/tmp/flinkCDC/conf/pipeline.yaml"); MountableFile.forHostPath(script), "/tmp/flinkCDC/conf/pipeline.yaml");
StringBuilder sb = new StringBuilder();
for (Path jar : jars) {
sb.append(" --jar /tmp/flinkCDC/lib/").append(jar.getFileName());
}
String commands = String commands =
"/tmp/flinkCDC/bin/flink-cdc.sh /tmp/flinkCDC/conf/pipeline.yaml --flink-home /opt/flink"; "/tmp/flinkCDC/bin/flink-cdc.sh /tmp/flinkCDC/conf/pipeline.yaml --flink-home /opt/flink"
+ sb;
ExecResult execResult = jobManager.execInContainer("bash", "-c", commands); ExecResult execResult = jobManager.execInContainer("bash", "-c", commands);
LOG.info(execResult.getStdout()); LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr()); LOG.error(execResult.getStderr());

Loading…
Cancel
Save