[FLINK-35316][base] Run E2e test cases with Flink 1.19 and latest patch versions

pull/3397/head
yuxiqian 8 months ago committed by Leonard Xu
parent fe1ceb9ff6
commit d8a9c8c63e

@ -28,8 +28,9 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-e2e-tests</artifactId>
<properties>
<flink-1.17>1.17.1</flink-1.17>
<flink-1.18>1.18.0</flink-1.18>
<flink-1.17>1.17.2</flink-1.17>
<flink-1.18>1.18.1</flink-1.18>
<flink-1.19>1.19.0</flink-1.19>
<mysql.driver.version>8.0.27</mysql.driver.version>
<starrocks.connector.version>1.2.9_flink-${flink.major.version}</starrocks.connector.version>
</properties>
@ -88,12 +89,14 @@ limitations under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>

@ -114,12 +114,12 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
60000L);
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
@ -186,6 +186,14 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
Statement stat = conn.createStatement()) {
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
// Perform DDL changes after the binlog is generated
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
20000L);
// modify table schema
stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
stat.execute(
@ -201,7 +209,7 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
throw e;
}
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
@ -236,17 +244,13 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
validateResult(expectedEvents);
}
private void validateResult(List<String> expectedEvents) {
String stdout = taskManagerConsumer.toUtf8String();
private void validateResult(List<String> expectedEvents) throws Exception {
for (String event : expectedEvents) {
if (!stdout.contains(event)) {
throw new RuntimeException(
"failed to get specific event: " + event + " from stdout: " + stdout);
}
waitUntilSpecificEvent(event, 6000L);
}
}
private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {

@ -132,13 +132,13 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
60000L);
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
60000L);
List<String> expectedEvents =
Arrays.asList(
@ -188,19 +188,19 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
20000L);
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
20000L);
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}",
transformRenameDatabase.getDatabaseName()),
6000L);
20000L);
String stdout = taskManagerConsumer.toUtf8String();
System.out.println(stdout);

@ -28,6 +28,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
import com.fasterxml.jackson.core.Version;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@ -54,6 +55,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkState;
@ -71,17 +73,6 @@ public abstract class PipelineTestEnvironment extends TestLogger {
public static final int JOB_MANAGER_REST_PORT = 8081;
public static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
public static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
public static final String FLINK_PROPERTIES =
String.join(
"\n",
Arrays.asList(
"jobmanager.rpc.address: jobmanager",
"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
"execution.checkpointing.interval: 300",
// this is needed for oracle-cdc tests.
// see https://stackoverflow.com/a/47062742/4915129
"env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
@ClassRule public static final Network NETWORK = Network.newNetwork();
@ -97,13 +88,16 @@ public abstract class PipelineTestEnvironment extends TestLogger {
@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List<String> getFlinkVersion() {
return Arrays.asList("1.17.1", "1.18.0");
return Arrays.asList("1.17.2", "1.18.1", "1.19.0");
}
@Before
public void before() throws Exception {
LOG.info("Starting containers...");
jobManagerConsumer = new ToStringConsumer();
String flinkProperties = getFlinkProperties(flinkVersion);
jobManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
@ -111,7 +105,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
.withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withEnv("FLINK_PROPERTIES", flinkProperties)
.withLogConsumer(jobManagerConsumer);
taskManagerConsumer = new ToStringConsumer();
taskManager =
@ -120,7 +114,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
.withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withEnv("FLINK_PROPERTIES", flinkProperties)
.dependsOn(jobManager)
.withLogConsumer(taskManagerConsumer);
@ -246,4 +240,37 @@ public abstract class PipelineTestEnvironment extends TestLogger {
protected String getFlinkDockerImageTag() {
return String.format("flink:%s-scala_2.12", flinkVersion);
}
private static Version parseVersion(String version) {
List<Integer> versionParts =
Arrays.stream(version.split("\\."))
.map(Integer::valueOf)
.limit(3)
.collect(Collectors.toList());
return new Version(
versionParts.get(0), versionParts.get(1), versionParts.get(2), null, null, null);
}
private static String getFlinkProperties(String flinkVersion) {
// this is needed for oracle-cdc tests.
// see https://stackoverflow.com/a/47062742/4915129
String javaOptsConfig;
Version version = parseVersion(flinkVersion);
if (version.compareTo(parseVersion("1.17.0")) >= 0) {
// Flink 1.17 renames `env.java.opts` to `env.java.opts.all`
javaOptsConfig = "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false";
} else {
// Legacy Flink version, might drop their support in near future
javaOptsConfig = "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false";
}
return String.join(
"\n",
Arrays.asList(
"jobmanager.rpc.address: jobmanager",
"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
"execution.checkpointing.interval: 300",
javaOptsConfig));
}
}

@ -28,13 +28,13 @@ limitations under the License.
<artifactId>flink-cdc-source-e2e-tests</artifactId>
<properties>
<flink-1.14>1.14.6</flink-1.14>
<flink-1.15>1.15.4</flink-1.15>
<flink-1.16>1.16.2</flink-1.16>
<flink-1.17>1.17.1</flink-1.17>
<flink-1.18>1.18.0</flink-1.18>
<flink-1.16>1.16.3</flink-1.16>
<flink-1.17>1.17.2</flink-1.17>
<flink-1.18>1.18.1</flink-1.18>
<flink-1.19>1.19.0</flink-1.19>
<jdbc.version-1.17>3.1.1-1.17</jdbc.version-1.17>
<jdbc.version-1.18>3.1.1-1.17</jdbc.version-1.18>
<jdbc.version-1.18>3.1.2-1.18</jdbc.version-1.18>
<jdbc.version-1.19>3.1.2-1.18</jdbc.version-1.19>
<mysql.driver.version>8.0.27</mysql.driver.version>
<postgresql.driver.version>42.5.1</postgresql.driver.version>
</properties>
@ -237,21 +237,11 @@ limitations under the License.
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink-1.14}</version>
<destFileName>jdbc-connector_${flink-1.14}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink-1.15}</version>
<destFileName>jdbc-connector_${flink-1.15}.jar</destFileName>
<version>${flink-1.16}</version>
<destFileName>jdbc-connector_${flink-1.16}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
@ -260,8 +250,8 @@ limitations under the License.
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink-1.16}</version>
<destFileName>jdbc-connector_${flink-1.16}.jar</destFileName>
<version>${jdbc.version-1.17}</version>
<destFileName>jdbc-connector_${flink-1.17}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
@ -270,8 +260,8 @@ limitations under the License.
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${jdbc.version-1.17}</version>
<destFileName>jdbc-connector_${flink-1.17}.jar</destFileName>
<version>${jdbc.version-1.18}</version>
<destFileName>jdbc-connector_${flink-1.18}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
@ -280,8 +270,8 @@ limitations under the License.
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${jdbc.version-1.18}</version>
<destFileName>jdbc-connector_${flink-1.18}.jar</destFileName>
<version>${jdbc.version-1.19}</version>
<destFileName>jdbc-connector_${flink-1.19}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>

@ -31,6 +31,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
import com.fasterxml.jackson.core.Version;
import com.github.dockerjava.api.DockerClient;
import org.junit.After;
import org.junit.AfterClass;
@ -62,6 +63,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkState;
@ -80,17 +82,6 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
private static final String FLINK_BIN = "bin";
private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
private static final String FLINK_PROPERTIES =
String.join(
"\n",
Arrays.asList(
"jobmanager.rpc.address: jobmanager",
"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
"execution.checkpointing.interval: 10000",
// this is needed for oracle-cdc tests.
// see https://stackoverflow.com/a/47062742/4915129
"env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the sink for easier verifying)
@ -129,17 +120,19 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List<String> getFlinkVersion() {
return Arrays.asList("1.14.6", "1.15.4", "1.16.2", "1.17.1", "1.18.0");
return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0");
}
private static final List<String> FLINK_VERSION_WITH_SCALA_212 =
Arrays.asList("1.15.4", "1.16.2", "1.17.1", "1.18.0");
Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.0");
@Before
public void before() {
mysqlInventoryDatabase.createAndInitialize();
jdbcJar = TestUtils.getResource(getJdbcConnectorResourceName());
String flinkProperties = getFlinkProperties(flinkVersion);
LOG.info("Starting containers...");
jobManager =
new GenericContainer<>(getFlinkDockerImageTag())
@ -148,7 +141,7 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
.withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withEnv("FLINK_PROPERTIES", flinkProperties)
.withLogConsumer(new Slf4jLogConsumer(LOG));
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
@ -156,7 +149,7 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
.withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withEnv("FLINK_PROPERTIES", flinkProperties)
.dependsOn(jobManager)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ -325,4 +318,37 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
protected String getJdbcConnectorResourceName() {
return String.format("jdbc-connector_%s.jar", flinkVersion);
}
private static Version parseVersion(String version) {
List<Integer> versionParts =
Arrays.stream(version.split("\\."))
.map(Integer::valueOf)
.limit(3)
.collect(Collectors.toList());
return new Version(
versionParts.get(0), versionParts.get(1), versionParts.get(2), null, null, null);
}
private static String getFlinkProperties(String flinkVersion) {
// this is needed for oracle-cdc tests.
// see https://stackoverflow.com/a/47062742/4915129
String javaOptsConfig;
Version version = parseVersion(flinkVersion);
if (version.compareTo(parseVersion("1.17.0")) >= 0) {
// Flink 1.17 renames `env.java.opts` to `env.java.opts.all`
javaOptsConfig = "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false";
} else {
// Legacy Flink version, might drop their support in near future
javaOptsConfig = "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false";
}
return String.join(
"\n",
Arrays.asList(
"jobmanager.rpc.address: jobmanager",
"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
"execution.checkpointing.interval: 300",
javaOptsConfig));
}
}

Loading…
Cancel
Save