[e2e][pipeline-connector][mysql] Add e2e test for mysql pipeline connector

This closes #2997.
pull/3248/head
Kunni 9 months ago committed by GitHub
parent 01ec7da989
commit bc7031bd71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-cdc-e2e-tests</artifactId>
<groupId>org.apache.flink</groupId>
<version>${revision}</version>
</parent>
<artifactId>flink-cdc-e2e-utils</artifactId>
<name>flink-cdc-e2e-utils</name>
<packaging>jar</packaging>
</project>

@ -15,9 +15,10 @@
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.tests.utils;
package org.apache.flink.cdc.common.test.utils;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import java.sql.Connection;
import java.sql.DriverManager;
@ -28,8 +29,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertArrayEquals;
/** Proxy to communicate with database using JDBC protocol. */
public class JdbcProxy {
@ -45,11 +44,11 @@ public class JdbcProxy {
this.driverClass = driverClass;
}
public void checkResult(List<String> expectedResult, String table, String[] fields)
private void checkResult(List<String> expectedResult, String table, String[] fields)
throws SQLException, ClassNotFoundException {
Class.forName(driverClass);
try (Connection dbConn = DriverManager.getConnection(url, userName, password);
PreparedStatement statement = dbConn.prepareStatement("select * from " + table);
PreparedStatement statement = dbConn.prepareStatement("SELECT * FROM " + table);
ResultSet resultSet = statement.executeQuery()) {
List<String> results = new ArrayList<>();
while (resultSet.next()) {
@ -68,10 +67,14 @@ public class JdbcProxy {
Collections.sort(results);
Collections.sort(expectedResult);
// make it easier to check the result
assertArrayEquals(expectedResult.toArray(), results.toArray());
Assert.assertArrayEquals(expectedResult.toArray(), results.toArray());
}
}
/**
* Check the result of a table with specified fields. If the result is not as expected, it will
* retry until timeout.
*/
public void checkResultWithTimeout(
List<String> expectedResult, String table, String[] fields, long timeout)
throws Exception {

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.tests.utils;
package org.apache.flink.cdc.common.test.utils;
import java.util.function.Function;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.tests.utils;
package org.apache.flink.cdc.common.test.utils;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -28,8 +28,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/** General test utilities. */
public enum TestUtils {
;
public class TestUtils {
private static final ParameterProperty<Path> MODULE_DIRECTORY =
new ParameterProperty<>("moduleDir", Paths::get);
@ -37,19 +36,28 @@ public enum TestUtils {
/**
* Searches for a resource file matching the given regex in the given directory. This method is
* primarily intended to be used for the initialization of static {@link Path} fields for
* resource file(i.e. jar, config file) that reside in the modules {@code target} directory.
* resource file(i.e. jar, config file). if resolvePaths is empty, this method will search file
* under the modules {@code target} directory. if resolvePaths is not empty, this method will
* search file under resolvePaths of current project.
*
* @param resourceNameRegex regex pattern to match against
* @return Path pointing to the matching jar
* @param resolvePaths an array of resolve paths of current project
* @return Path pointing to the matching file
* @throws RuntimeException if none or multiple resource files could be found
*/
public static Path getResource(final String resourceNameRegex) {
public static Path getResource(final String resourceNameRegex, String... resolvePaths) {
// if the property is not set then we are most likely running in the IDE, where the working
// directory is the
// module of the test that is currently running, which is exactly what we want
Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
Path path = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
if (resolvePaths != null && resolvePaths.length > 0) {
path = path.getParent().getParent();
for (String resolvePath : resolvePaths) {
path = path.resolve(resolvePath);
}
}
try (Stream<Path> dependencyResources = Files.walk(moduleDirectory)) {
try (Stream<Path> dependencyResources = Files.walk(path)) {
final List<Path> matchingResources =
dependencyResources
.filter(

@ -27,4 +27,210 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-e2e-tests</artifactId>
<properties>
<flink-1.17>1.17.1</flink-1.17>
<flink-1.18>1.18.0</flink-1.18>
<mysql.driver.version>8.0.27</mysql.driver.version>
<starrocks.connector.version>1.2.9_flink-${flink.major.version}</starrocks.connector.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-e2e-utils</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- Drivers -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
<version>${mysql.driver.version}</version>
<scope>test</scope>
</dependency>
<!-- CDC connectors test utils -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-dist</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-values</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-mysql</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- testcontainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<excludes>
<exclude>**/flink-cdc.sh</exclude>
<exclude>**/flink-cdc.yaml</exclude>
</excludes>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<phase>none</phase>
</execution>
<execution>
<id>integration-tests</id>
<phase>none</phase>
</execution>
<execution>
<id>end-to-end-tests</id>
<phase>integration-test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<includes>
<include>**/*.*</include>
</includes>
<forkCount>1</forkCount>
<systemPropertyVariables>
<moduleDir>${project.basedir}</moduleDir>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-jars</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
</execution>
</executions>
<configuration>
<artifactItems>
<artifactItem>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.driver.version}</version>
<destFileName>mysql-driver.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-dist</artifactId>
<version>${project.version}</version>
<destFileName>flink-cdc-dist.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-values</artifactId>
<version>${project.version}</version>
<destFileName>values-cdc-pipeline-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-mysql</artifactId>
<version>${project.version}</version>
<destFileName>mysql-cdc-pipeline-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
<version>${project.version}</version>
<destFileName>doris-cdc-pipeline-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>
<version>${project.version}</version>
<destFileName>starrocks-cdc-pipeline-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,267 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.pipeline.tests;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
/** End-to-end tests for mysql cdc pipeline job. */
@RunWith(Parameterized.class)
public class MysqlE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MysqlE2eITCase.class);
// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the data source for easier verifying)
// ------------------------------------------------------------------------------------------
protected static final String MYSQL_TEST_USER = "mysqluser";
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
@ClassRule
public static final MySqlContainer MYSQL =
(MySqlContainer)
new MySqlContainer(
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
.withConfigurationOverride("docker/mysql/my.cnf")
.withSetupSQL("docker/mysql/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
protected final UniqueDatabase mysqlInventoryDatabase =
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
@Before
public void before() throws Exception {
super.before();
mysqlInventoryDatabase.createAndInitialize();
}
@After
public void after() {
super.after();
mysqlInventoryDatabase.dropDatabase();
}
@Test
public void testSyncWholeDatabase() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
60000L);
waitUtilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
60000L);
List<String> expectedEvents =
Arrays.asList(
String.format(
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING,`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()));
validateResult(expectedEvents);
LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
// modify table schema
stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
stat.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110
stat.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111
stat.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
stat.execute("DELETE FROM products WHERE id=111;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
waitUtilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
60000L);
expectedEvents =
Arrays.asList(
String.format(
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
String.format(
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()));
validateResult(expectedEvents);
}
private void validateResult(List<String> expectedEvents) {
String stdout = taskManagerConsumer.toUtf8String();
for (String event : expectedEvents) {
if (!stdout.contains(event)) {
throw new RuntimeException(
"failed to get specific event: " + event + " from stdout: " + stdout);
}
}
}
private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
result = true;
break;
}
Thread.sleep(1000);
}
if (!result) {
throw new TimeoutException(
"failed to get specific event: "
+ event
+ " from stdout: "
+ taskManagerConsumer.toUtf8String());
}
}
}

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.pipeline.tests.utils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.ToStringConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkState;
/** Test environment running pipeline job on Flink containers. */
@RunWith(Parameterized.class)
public abstract class PipelineTestEnvironment extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(PipelineTestEnvironment.class);
@Parameterized.Parameter public String flinkVersion;
// ------------------------------------------------------------------------------------------
// Flink Variables
// ------------------------------------------------------------------------------------------
public static final int JOB_MANAGER_REST_PORT = 8081;
public static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
public static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
public static final String FLINK_PROPERTIES =
String.join(
"\n",
Arrays.asList(
"jobmanager.rpc.address: jobmanager",
"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
"execution.checkpointing.interval: 300",
// this is needed for oracle-cdc tests.
// see https://stackoverflow.com/a/47062742/4915129
"env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
@ClassRule public static final Network NETWORK = Network.newNetwork();
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Nullable protected RestClusterClient<StandaloneClusterId> restClusterClient;
protected GenericContainer<?> jobManager;
protected GenericContainer<?> taskManager;
protected ToStringConsumer jobManagerConsumer;
protected ToStringConsumer taskManagerConsumer;
@Parameterized.Parameters(name = "flinkVersion: {0}")
public static List<String> getFlinkVersion() {
return Arrays.asList("1.17.1", "1.18.0");
}
@Before
public void before() throws Exception {
LOG.info("Starting containers...");
jobManagerConsumer = new ToStringConsumer();
jobManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withLogConsumer(jobManagerConsumer);
taskManagerConsumer = new ToStringConsumer();
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
.withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.dependsOn(jobManager)
.withLogConsumer(taskManagerConsumer);
Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
LOG.info("Containers are started.");
}
@After
public void after() {
if (restClusterClient != null) {
restClusterClient.close();
}
if (jobManager != null) {
jobManager.stop();
}
if (taskManager != null) {
taskManager.stop();
}
}
/** Allow overriding the default flink properties. */
public void overrideFlinkProperties(String properties) {
jobManager.withEnv("FLINK_PROPERTIES", properties);
taskManager.withEnv("FLINK_PROPERTIES", properties);
}
/**
* Submits a SQL job to the running cluster.
*
* <p><b>NOTE:</b> You should not use {@code '\t'}.
*/
public void submitPipelineJob(String pipelineJob, Path... jars)
throws IOException, InterruptedException {
for (Path jar : jars) {
jobManager.copyFileToContainer(
MountableFile.forHostPath(jar), "/tmp/flinkCDC/lib/" + jar.getFileName());
}
jobManager.copyFileToContainer(
MountableFile.forHostPath(
TestUtils.getResource("flink-cdc.sh", "flink-cdc-dist", "src"), 755),
"/tmp/flinkCDC/bin/flink-cdc.sh");
jobManager.copyFileToContainer(
MountableFile.forHostPath(
TestUtils.getResource("flink-cdc.yaml", "flink-cdc-dist", "src"), 755),
"/tmp/flinkCDC/conf/flink-cdc.yaml");
jobManager.copyFileToContainer(
MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")),
"/tmp/flinkCDC/lib/flink-cdc-dist.jar");
Path script = temporaryFolder.newFile().toPath();
Files.write(script, pipelineJob.getBytes());
jobManager.copyFileToContainer(
MountableFile.forHostPath(script), "/tmp/flinkCDC/conf/pipeline.yaml");
String commands =
"/tmp/flinkCDC/bin/flink-cdc.sh /tmp/flinkCDC/conf/pipeline.yaml --flink-home /opt/flink";
ExecResult execResult = jobManager.execInContainer("bash", "-c", commands);
LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr());
if (execResult.getExitCode() != 0) {
throw new AssertionError("Failed when submitting the pipeline job.");
}
}
/**
* Get {@link RestClusterClient} connected to this FlinkContainer.
*
* <p>This method lazily initializes the REST client on-demand.
*/
public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
if (restClusterClient != null) {
return restClusterClient;
}
checkState(
jobManager.isRunning(),
"Cluster client should only be retrieved for a running cluster");
try {
final Configuration clientConfiguration = new Configuration();
clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
clientConfiguration.set(
RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
this.restClusterClient =
new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance());
} catch (Exception e) {
throw new IllegalStateException(
"Failed to create client for Flink container cluster", e);
}
return restClusterClient;
}
public void waitUntilJobRunning(Duration timeout) {
RestClusterClient<?> clusterClient = getRestClusterClient();
Deadline deadline = Deadline.fromNow(timeout);
while (deadline.hasTimeLeft()) {
Collection<JobStatusMessage> jobStatusMessages;
try {
jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Error when fetching job status.", e);
continue;
}
if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
JobStatusMessage message = jobStatusMessages.iterator().next();
JobStatus jobStatus = message.getJobState();
if (jobStatus.isTerminalState()) {
throw new ValidationException(
String.format(
"Job has been terminated! JobName: %s, JobID: %s, Status: %s",
message.getJobName(),
message.getJobId(),
message.getJobState()));
} else if (jobStatus == JobStatus.RUNNING) {
return;
}
}
}
}
protected String getFlinkDockerImageTag() {
return String.format("flink:%s-scala_2.12", flinkVersion);
}
}

@ -0,0 +1,53 @@
-- Copyright 2023 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: mysql_inventory
-- ----------------------------------------------------------------------------------------------------------------
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT,
enum_c enum('red', 'white') default 'red', -- test some complex types as well,
json_c JSON, -- because we use additional dependencies to deserialize complex types.
point_c POINT
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1": "value1"}', ST_GeomFromText('POINT(1 1)')),
(default,"car battery","12V car battery",8.1, 'white', '{"key2": "value2"}', ST_GeomFromText('POINT(2 2)')),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3 3)')),
(default,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4": "value4"}', ST_GeomFromText('POINT(4 4)')),
(default,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1", "k2": "v2"}', ST_GeomFromText('POINT(5 5)')),
(default,"hammer","16oz carpenter's hammer",1.0, null, null, null),
(default,"rocks","box of assorted rocks",5.3, null, null, null),
(default,"jacket","water resistent black wind breaker",0.1, null, null, null),
(default,"spare tire","24 inch spare tire",22.2, null, null, null);
-- Create and populate our customers using a single insert with many rows
CREATE TABLE customers (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(104,"user_4","Shanghai","123567891234");

@ -0,0 +1,62 @@
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
#secure-file-priv=/var/lib/mysql-files
secure-file-priv=/var/lib/mysql
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
#log-error=/var/log/mysqld.log
#pid-file=/var/run/mysqld/mysqld.pid
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on

@ -0,0 +1,28 @@
-- Copyright 2023 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
-- However, in this database we'll grant 2 users different privileges:
--
-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
-- 2) 'mysqluser' - all privileges
--
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: emptydb
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE emptydb;

@ -0,0 +1,26 @@
################################################################################
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n

@ -40,6 +40,14 @@ limitations under the License.
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-e2e-utils</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- Drivers -->
<dependency>
<groupId>mysql</groupId>

@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;

@ -17,10 +17,10 @@
package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;

@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.Test;
import org.slf4j.Logger;

@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;

@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;

@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;

@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;

@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.junit.After;
import org.junit.Before;

@ -17,9 +17,9 @@
package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy;
import org.apache.flink.cdc.connectors.tests.utils.TestUtils;
import org.apache.flink.cdc.connectors.vitess.VitessTestBase;
import org.apache.flink.cdc.connectors.vitess.container.VitessContainer;

@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.tests.utils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;

@ -30,6 +30,7 @@ limitations under the License.
<packaging>pom</packaging>
<modules>
<module>flink-cdc-e2e-utils</module>
<module>flink-cdc-source-e2e-tests</module>
<module>flink-cdc-pipeline-e2e-tests</module>
</modules>

Loading…
Cancel
Save