From af02ce1bc948c8b70807ef11f839544e3a22e836 Mon Sep 17 00:00:00 2001
From: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
Date: Thu, 8 Aug 2024 17:45:07 +0800
Subject: [PATCH] [build][e2e] Separate Pipeline and Source E2e tests and cover
flink 1.20 version
This closes #3514.
---
.github/workflows/flink_cdc.yml | 21 ++++++++++++-------
.../flink-cdc-pipeline-e2e-tests/pom.xml | 3 ++-
.../tests/utils/PipelineTestEnvironment.java | 2 +-
.../flink-cdc-source-e2e-tests/pom.xml | 20 ++++++++++++++----
.../utils/FlinkContainerTestEnvironment.java | 10 ++-------
5 files changed, 35 insertions(+), 21 deletions(-)
diff --git a/.github/workflows/flink_cdc.yml b/.github/workflows/flink_cdc.yml
index ef67eb55d..ac0dfdc82 100644
--- a/.github/workflows/flink_cdc.yml
+++ b/.github/workflows/flink_cdc.yml
@@ -94,8 +94,10 @@ env:
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-vitess-cdc,\
flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-vitess-cdc"
- MODULES_E2E: "\
- flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests,\
+ MODULES_PIPELINE_E2E: "\
+ flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests"
+
+ MODULES_SOURCE_E2E: "\
flink-cdc-e2e-tests/flink-cdc-source-e2e-tests"
jobs:
@@ -134,7 +136,8 @@ jobs:
"oceanbase",
"db2",
"vitess",
- "e2e"
+ "pipeline_e2e",
+ "source_e2e"
]
timeout-minutes: 120
env:
@@ -222,13 +225,17 @@ jobs:
("vitess")
modules=${{ env.MODULES_VITESS }}
;;
- ("e2e")
- compile_modules="${{ env.MODULES_CORE }},${{ env.MODULES_PIPELINE_CONNECTORS }},${{ env.MODULES_MYSQL }},${{ env.MODULES_POSTGRES }},${{ env.MODULES_ORACLE }},${{ env.MODULES_MONGODB }},${{ env.MODULES_SQLSERVER }},${{ env.MODULES_TIDB }},${{ env.MODULES_OCEANBASE }},${{ env.MODULES_DB2 }},${{ env.MODULES_VITESS }},${{ env.MODULES_E2E }}"
- modules=${{ env.MODULES_E2E }}
+ ("pipeline_e2e")
+ compile_modules="${{ env.MODULES_CORE }},${{ env.MODULES_PIPELINE_CONNECTORS }},${{ env.MODULES_MYSQL }},${{ env.MODULES_POSTGRES }},${{ env.MODULES_ORACLE }},${{ env.MODULES_MONGODB }},${{ env.MODULES_SQLSERVER }},${{ env.MODULES_TIDB }},${{ env.MODULES_OCEANBASE }},${{ env.MODULES_DB2 }},${{ env.MODULES_VITESS }},${{ env.MODULES_PIPELINE_E2E }}"
+ modules=${{ env.MODULES_PIPELINE_E2E }}
+ ;;
+ ("source_e2e")
+ compile_modules="${{ env.MODULES_CORE }},${{ env.MODULES_PIPELINE_CONNECTORS }},${{ env.MODULES_MYSQL }},${{ env.MODULES_POSTGRES }},${{ env.MODULES_ORACLE }},${{ env.MODULES_MONGODB }},${{ env.MODULES_SQLSERVER }},${{ env.MODULES_TIDB }},${{ env.MODULES_OCEANBASE }},${{ env.MODULES_DB2 }},${{ env.MODULES_VITESS }},${{ env.MODULES_SOURCE_E2E }}"
+ modules=${{ env.MODULES_SOURCE_E2E }}
;;
esac
- if [ ${{ matrix.module }} != "e2e" ]; then
+ if [ ${{ matrix.module }} != "pipeline_e2e" ] && [ ${{ matrix.module }} != "source_e2e" ]; then
compile_modules=$modules
fi
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 2326240b6..557ecafa4 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -30,7 +30,8 @@ limitations under the License.
1.17.2
1.18.1
- 1.19.0
+ 1.19.1
+ 1.20.0
8.0.27
1.2.9_flink-${flink.major.version}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 1143012a6..d1c0bb7e7 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -88,7 +88,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List getFlinkVersion() {
- return Arrays.asList("1.17.2", "1.18.1", "1.19.0");
+ return Arrays.asList("1.17.2", "1.18.1", "1.19.1", "1.20.0");
}
@Before
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index a7f35cf04..c1e50101a 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -31,10 +31,12 @@ limitations under the License.
1.16.3
1.17.2
1.18.1
- 1.19.0
- 3.1.1-1.17
- 3.1.2-1.18
- 3.1.2-1.18
+ 1.19.1
+ 1.20.0
+ 3.1.2-1.17
+ 3.2.0-1.18
+ 3.2.0-1.19
+ 3.2.0-1.19
8.0.27
42.7.3
@@ -277,6 +279,16 @@ limitations under the License.
+
+ org.apache.flink
+ flink-connector-jdbc
+ ${jdbc.version-1.20}
+ jdbc-connector_${flink-1.20}.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
org.apache.flink
flink-sql-connector-mysql-cdc
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
index 6175aec2c..91e7e5c6f 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
@@ -120,12 +120,9 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List getFlinkVersion() {
- return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0");
+ return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.1", "1.20.0");
}
- private static final List FLINK_VERSION_WITH_SCALA_212 =
- Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0");
-
@Before
public void before() {
mysqlInventoryDatabase.createAndInitialize();
@@ -309,10 +306,7 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
}
private String getFlinkDockerImageTag() {
- if (FLINK_VERSION_WITH_SCALA_212.contains(flinkVersion)) {
- return String.format("flink:%s-scala_2.12", flinkVersion);
- }
- return String.format("flink:%s-scala_2.11", flinkVersion);
+ return String.format("flink:%s-scala_2.12", flinkVersion);
}
protected String getJdbcConnectorResourceName() {