From d8a9c8c63ef5fb958a72c3a15e47ee29d72a481e Mon Sep 17 00:00:00 2001
From: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
Date: Wed, 5 Jun 2024 12:09:08 +0800
Subject: [PATCH] [FLINK-35316][base] Run E2e test cases with Flink 1.19 and
latest patch versions
---
.../flink-cdc-pipeline-e2e-tests/pom.xml | 7 ++-
.../cdc/pipeline/tests/MysqlE2eITCase.java | 24 ++++----
.../pipeline/tests/TransformE2eITCase.java | 10 ++--
.../tests/utils/PipelineTestEnvironment.java | 55 +++++++++++++-----
.../flink-cdc-source-e2e-tests/pom.xml | 38 +++++--------
.../utils/FlinkContainerTestEnvironment.java | 56 ++++++++++++++-----
6 files changed, 120 insertions(+), 70 deletions(-)
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index e785ba0a5..2326240b6 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -28,8 +28,9 @@ limitations under the License.
flink-cdc-pipeline-e2e-tests
- 1.17.1
- 1.18.0
+ 1.17.2
+ 1.18.1
+ 1.19.0
8.0.27
1.2.9_flink-${flink.major.version}
@@ -88,12 +89,14 @@ limitations under the License.
org.apache.flink
flink-cdc-pipeline-connector-doris
${project.version}
+ test-jar
test
org.apache.flink
flink-cdc-pipeline-connector-starrocks
${project.version}
+ test-jar
test
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 330717586..28db063ef 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -114,12 +114,12 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
- waitUtilSpecificEvent(
+ waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
60000L);
- waitUtilSpecificEvent(
+ waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
@@ -186,6 +186,14 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
Statement stat = conn.createStatement()) {
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+ // Perform DDL changes after the binlog is generated
+ waitUntilSpecificEvent(
+ String.format(
+ "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
+ mysqlInventoryDatabase.getDatabaseName()),
+ 20000L);
+
// modify table schema
stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
stat.execute(
@@ -201,7 +209,7 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
throw e;
}
- waitUtilSpecificEvent(
+ waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
@@ -236,17 +244,13 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
validateResult(expectedEvents);
}
- private void validateResult(List expectedEvents) {
- String stdout = taskManagerConsumer.toUtf8String();
+ private void validateResult(List expectedEvents) throws Exception {
for (String event : expectedEvents) {
- if (!stdout.contains(event)) {
- throw new RuntimeException(
- "failed to get specific event: " + event + " from stdout: " + stdout);
- }
+ waitUntilSpecificEvent(event, 6000L);
}
}
- private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
+ private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index cd8fc0b45..2f896d334 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -132,13 +132,13 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
- 6000L);
+ 60000L);
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
- 6000L);
+ 60000L);
List expectedEvents =
Arrays.asList(
@@ -188,19 +188,19 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
- 6000L);
+ 20000L);
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}",
transformRenameDatabase.getDatabaseName()),
- 6000L);
+ 20000L);
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}",
transformRenameDatabase.getDatabaseName()),
- 6000L);
+ 20000L);
String stdout = taskManagerConsumer.toUtf8String();
System.out.println(stdout);
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index a448bf554..65c0a202e 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
+import com.fasterxml.jackson.core.Version;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@@ -54,6 +55,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkState;
@@ -71,17 +73,6 @@ public abstract class PipelineTestEnvironment extends TestLogger {
public static final int JOB_MANAGER_REST_PORT = 8081;
public static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
public static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
- public static final String FLINK_PROPERTIES =
- String.join(
- "\n",
- Arrays.asList(
- "jobmanager.rpc.address: jobmanager",
- "taskmanager.numberOfTaskSlots: 10",
- "parallelism.default: 4",
- "execution.checkpointing.interval: 300",
- // this is needed for oracle-cdc tests.
- // see https://stackoverflow.com/a/47062742/4915129
- "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
@ClassRule public static final Network NETWORK = Network.newNetwork();
@@ -97,13 +88,16 @@ public abstract class PipelineTestEnvironment extends TestLogger {
@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List getFlinkVersion() {
- return Arrays.asList("1.17.1", "1.18.0");
+ return Arrays.asList("1.17.2", "1.18.1", "1.19.0");
}
@Before
public void before() throws Exception {
LOG.info("Starting containers...");
jobManagerConsumer = new ToStringConsumer();
+
+ String flinkProperties = getFlinkProperties(flinkVersion);
+
jobManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
@@ -111,7 +105,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
.withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withEnv("FLINK_PROPERTIES", flinkProperties)
.withLogConsumer(jobManagerConsumer);
taskManagerConsumer = new ToStringConsumer();
taskManager =
@@ -120,7 +114,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
.withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withEnv("FLINK_PROPERTIES", flinkProperties)
.dependsOn(jobManager)
.withLogConsumer(taskManagerConsumer);
@@ -246,4 +240,37 @@ public abstract class PipelineTestEnvironment extends TestLogger {
protected String getFlinkDockerImageTag() {
return String.format("flink:%s-scala_2.12", flinkVersion);
}
+
+ private static Version parseVersion(String version) {
+ List versionParts =
+ Arrays.stream(version.split("\\."))
+ .map(Integer::valueOf)
+ .limit(3)
+ .collect(Collectors.toList());
+ return new Version(
+ versionParts.get(0), versionParts.get(1), versionParts.get(2), null, null, null);
+ }
+
+ private static String getFlinkProperties(String flinkVersion) {
+ // this is needed for oracle-cdc tests.
+ // see https://stackoverflow.com/a/47062742/4915129
+ String javaOptsConfig;
+ Version version = parseVersion(flinkVersion);
+ if (version.compareTo(parseVersion("1.17.0")) >= 0) {
+ // Flink 1.17 renames `env.java.opts` to `env.java.opts.all`
+ javaOptsConfig = "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false";
+ } else {
+ // Legacy Flink version, might drop their support in near future
+ javaOptsConfig = "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false";
+ }
+
+ return String.join(
+ "\n",
+ Arrays.asList(
+ "jobmanager.rpc.address: jobmanager",
+ "taskmanager.numberOfTaskSlots: 10",
+ "parallelism.default: 4",
+ "execution.checkpointing.interval: 300",
+ javaOptsConfig));
+ }
}
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index adb3e0f82..6f604f346 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -28,13 +28,13 @@ limitations under the License.
flink-cdc-source-e2e-tests
- 1.14.6
- 1.15.4
- 1.16.2
- 1.17.1
- 1.18.0
+ 1.16.3
+ 1.17.2
+ 1.18.1
+ 1.19.0
3.1.1-1.17
- 3.1.1-1.17
+ 3.1.2-1.18
+ 3.1.2-1.18
8.0.27
42.5.1
@@ -237,21 +237,11 @@ limitations under the License.
-
- org.apache.flink
- flink-connector-jdbc_2.11
- ${flink-1.14}
- jdbc-connector_${flink-1.14}.jar
- jar
- ${project.build.directory}/dependencies
-
-
-
org.apache.flink
flink-connector-jdbc
- ${flink-1.15}
- jdbc-connector_${flink-1.15}.jar
+ ${flink-1.16}
+ jdbc-connector_${flink-1.16}.jar
jar
${project.build.directory}/dependencies
@@ -260,8 +250,8 @@ limitations under the License.
org.apache.flink
flink-connector-jdbc
- ${flink-1.16}
- jdbc-connector_${flink-1.16}.jar
+ ${jdbc.version-1.17}
+ jdbc-connector_${flink-1.17}.jar
jar
${project.build.directory}/dependencies
@@ -270,8 +260,8 @@ limitations under the License.
org.apache.flink
flink-connector-jdbc
- ${jdbc.version-1.17}
- jdbc-connector_${flink-1.17}.jar
+ ${jdbc.version-1.18}
+ jdbc-connector_${flink-1.18}.jar
jar
${project.build.directory}/dependencies
@@ -280,8 +270,8 @@ limitations under the License.
org.apache.flink
flink-connector-jdbc
- ${jdbc.version-1.18}
- jdbc-connector_${flink-1.18}.jar
+ ${jdbc.version-1.19}
+ jdbc-connector_${flink-1.19}.jar
jar
${project.build.directory}/dependencies
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
index 650e205b8..6175aec2c 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
+import com.fasterxml.jackson.core.Version;
import com.github.dockerjava.api.DockerClient;
import org.junit.After;
import org.junit.AfterClass;
@@ -62,6 +63,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkState;
@@ -80,17 +82,6 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
private static final String FLINK_BIN = "bin";
private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
- private static final String FLINK_PROPERTIES =
- String.join(
- "\n",
- Arrays.asList(
- "jobmanager.rpc.address: jobmanager",
- "taskmanager.numberOfTaskSlots: 10",
- "parallelism.default: 4",
- "execution.checkpointing.interval: 10000",
- // this is needed for oracle-cdc tests.
- // see https://stackoverflow.com/a/47062742/4915129
- "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the sink for easier verifying)
@@ -129,17 +120,19 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List getFlinkVersion() {
- return Arrays.asList("1.14.6", "1.15.4", "1.16.2", "1.17.1", "1.18.0");
+ return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0");
}
private static final List FLINK_VERSION_WITH_SCALA_212 =
- Arrays.asList("1.15.4", "1.16.2", "1.17.1", "1.18.0");
+ Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0");
@Before
public void before() {
mysqlInventoryDatabase.createAndInitialize();
jdbcJar = TestUtils.getResource(getJdbcConnectorResourceName());
+ String flinkProperties = getFlinkProperties(flinkVersion);
+
LOG.info("Starting containers...");
jobManager =
new GenericContainer<>(getFlinkDockerImageTag())
@@ -148,7 +141,7 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
.withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withEnv("FLINK_PROPERTIES", flinkProperties)
.withLogConsumer(new Slf4jLogConsumer(LOG));
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
@@ -156,7 +149,7 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
.withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
- .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withEnv("FLINK_PROPERTIES", flinkProperties)
.dependsOn(jobManager)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@@ -325,4 +318,37 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
protected String getJdbcConnectorResourceName() {
return String.format("jdbc-connector_%s.jar", flinkVersion);
}
+
+ private static Version parseVersion(String version) {
+ List versionParts =
+ Arrays.stream(version.split("\\."))
+ .map(Integer::valueOf)
+ .limit(3)
+ .collect(Collectors.toList());
+ return new Version(
+ versionParts.get(0), versionParts.get(1), versionParts.get(2), null, null, null);
+ }
+
+ private static String getFlinkProperties(String flinkVersion) {
+ // this is needed for oracle-cdc tests.
+ // see https://stackoverflow.com/a/47062742/4915129
+ String javaOptsConfig;
+ Version version = parseVersion(flinkVersion);
+ if (version.compareTo(parseVersion("1.17.0")) >= 0) {
+ // Flink 1.17 renames `env.java.opts` to `env.java.opts.all`
+ javaOptsConfig = "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false";
+ } else {
+ // Legacy Flink version, might drop their support in near future
+ javaOptsConfig = "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false";
+ }
+
+ return String.join(
+ "\n",
+ Arrays.asList(
+ "jobmanager.rpc.address: jobmanager",
+ "taskmanager.numberOfTaskSlots: 10",
+ "parallelism.default: 4",
+ "execution.checkpointing.interval: 300",
+ javaOptsConfig));
+ }
}