diff --git a/flink-cdc-e2e-tests/flink-cdc-e2e-utils/pom.xml b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/pom.xml new file mode 100644 index 000000000..2e38a55ed --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/pom.xml @@ -0,0 +1,31 @@ + + + + 4.0.0 + + flink-cdc-e2e-tests + org.apache.flink + ${revision} + + + flink-cdc-e2e-utils + flink-cdc-e2e-utils + jar + \ No newline at end of file diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/JdbcProxy.java b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/JdbcProxy.java similarity index 88% rename from flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/JdbcProxy.java rename to flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/JdbcProxy.java index c0f0167ae..cb1735de6 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/JdbcProxy.java +++ b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/JdbcProxy.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.flink.cdc.connectors.tests.utils; +package org.apache.flink.cdc.common.test.utils; import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; import java.sql.Connection; import java.sql.DriverManager; @@ -28,8 +29,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.junit.Assert.assertArrayEquals; - /** Proxy to communicate with database using JDBC protocol. */ public class JdbcProxy { @@ -45,11 +44,11 @@ public class JdbcProxy { this.driverClass = driverClass; } - public void checkResult(List expectedResult, String table, String[] fields) + private void checkResult(List expectedResult, String table, String[] fields) throws SQLException, ClassNotFoundException { Class.forName(driverClass); try (Connection dbConn = DriverManager.getConnection(url, userName, password); - PreparedStatement statement = dbConn.prepareStatement("select * from " + table); + PreparedStatement statement = dbConn.prepareStatement("SELECT * FROM " + table); ResultSet resultSet = statement.executeQuery()) { List results = new ArrayList<>(); while (resultSet.next()) { @@ -68,10 +67,14 @@ public class JdbcProxy { Collections.sort(results); Collections.sort(expectedResult); // make it easier to check the result - assertArrayEquals(expectedResult.toArray(), results.toArray()); + Assert.assertArrayEquals(expectedResult.toArray(), results.toArray()); } } + /** + * Check the result of a table with specified fields. If the result is not as expected, it will + * retry until timeout. + */ public void checkResultWithTimeout( List expectedResult, String table, String[] fields, long timeout) throws Exception { diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/ParameterProperty.java b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/ParameterProperty.java similarity index 96% rename from flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/ParameterProperty.java rename to flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/ParameterProperty.java index ac13e9519..55abbd5a7 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/ParameterProperty.java +++ b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/ParameterProperty.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.cdc.connectors.tests.utils; +package org.apache.flink.cdc.common.test.utils; import java.util.function.Function; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/TestUtils.java b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java similarity index 80% rename from flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/TestUtils.java rename to flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java index bb6492eee..7923bb9b9 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/TestUtils.java +++ b/flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/TestUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.cdc.connectors.tests.utils; +package org.apache.flink.cdc.common.test.utils; import java.io.FileNotFoundException; import java.io.IOException; @@ -28,8 +28,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; /** General test utilities. */ -public enum TestUtils { - ; +public class TestUtils { private static final ParameterProperty MODULE_DIRECTORY = new ParameterProperty<>("moduleDir", Paths::get); @@ -37,19 +36,28 @@ public enum TestUtils { /** * Searches for a resource file matching the given regex in the given directory. This method is * primarily intended to be used for the initialization of static {@link Path} fields for - * resource file(i.e. jar, config file) that reside in the modules {@code target} directory. + * resource file(i.e. jar, config file). if resolvePaths is empty, this method will search file + * under the modules {@code target} directory. if resolvePaths is not empty, this method will + * search file under resolvePaths of current project. * * @param resourceNameRegex regex pattern to match against - * @return Path pointing to the matching jar + * @param resolvePaths an array of resolve paths of current project + * @return Path pointing to the matching file * @throws RuntimeException if none or multiple resource files could be found */ - public static Path getResource(final String resourceNameRegex) { + public static Path getResource(final String resourceNameRegex, String... resolvePaths) { // if the property is not set then we are most likely running in the IDE, where the working // directory is the // module of the test that is currently running, which is exactly what we want - Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath()); + Path path = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath()); + if (resolvePaths != null && resolvePaths.length > 0) { + path = path.getParent().getParent(); + for (String resolvePath : resolvePaths) { + path = path.resolve(resolvePath); + } + } - try (Stream dependencyResources = Files.walk(moduleDirectory)) { + try (Stream dependencyResources = Files.walk(path)) { final List matchingResources = dependencyResources .filter( diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index 854bab714..e785ba0a5 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -27,4 +27,210 @@ limitations under the License. flink-cdc-pipeline-e2e-tests + + 1.17.1 + 1.18.0 + 8.0.27 + 1.2.9_flink-${flink.major.version} + + + + + org.apache.flink + flink-cdc-e2e-utils + ${project.version} + test-jar + test + + + + + mysql + mysql-connector-java + + + com.google.protobuf + protobuf-java + + + ${mysql.driver.version} + test + + + + + org.apache.flink + flink-cdc-dist + ${project.version} + test + + + org.apache.flink + flink-connector-mysql-cdc + ${project.version} + test-jar + test + + + org.apache.flink + flink-cdc-pipeline-connector-values + ${project.version} + test + + + org.apache.flink + flink-cdc-pipeline-connector-mysql + ${project.version} + test-jar + test + + + org.apache.flink + flink-cdc-pipeline-connector-doris + ${project.version} + test + + + org.apache.flink + flink-cdc-pipeline-connector-starrocks + ${project.version} + test + + + org.apache.flink + flink-connector-test-util + ${project.version} + test + + + + + org.testcontainers + mysql + ${testcontainers.version} + test + + + + + + + src/test/resources + + **/flink-cdc.sh + **/flink-cdc.yaml + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + default-test + none + + + integration-tests + none + + + end-to-end-tests + integration-test + + test + + + + **/*.* + + 1 + + ${project.basedir} + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-jars + package + + copy + + + + + + + + mysql + mysql-connector-java + ${mysql.driver.version} + mysql-driver.jar + jar + ${project.build.directory}/dependencies + + + + + org.apache.flink + flink-cdc-dist + ${project.version} + flink-cdc-dist.jar + jar + ${project.build.directory}/dependencies + + + + + org.apache.flink + flink-cdc-pipeline-connector-values + ${project.version} + values-cdc-pipeline-connector.jar + jar + ${project.build.directory}/dependencies + + + + + org.apache.flink + flink-cdc-pipeline-connector-mysql + ${project.version} + mysql-cdc-pipeline-connector.jar + jar + ${project.build.directory}/dependencies + + + + + org.apache.flink + flink-cdc-pipeline-connector-doris + ${project.version} + doris-cdc-pipeline-connector.jar + jar + ${project.build.directory}/dependencies + + + + + org.apache.flink + flink-cdc-pipeline-connector-starrocks + ${project.version} + starrocks-cdc-pipeline-connector.jar + jar + ${project.build.directory}/dependencies + + + + + + + \ No newline at end of file diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java new file mode 100644 index 000000000..4f0d9e002 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; + +/** End-to-end tests for mysql cdc pipeline job. */ +@RunWith(Parameterized.class) +public class MysqlE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MysqlE2eITCase.class); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase mysqlInventoryDatabase = + new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @Before + public void before() throws Exception { + super.before(); + mysqlInventoryDatabase.createAndInitialize(); + } + + @After + public void after() { + super.after(); + mysqlInventoryDatabase.dropDatabase(); + } + + @Test + public void testSyncWholeDatabase() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + waitUtilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + 60000L); + waitUtilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + 60000L); + List expectedEvents = + Arrays.asList( + String.format( + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING,`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + validateResult(expectedEvents); + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + // modify table schema + stat.execute("ALTER TABLE products ADD COLUMN new_col INT;"); + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110 + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111 + stat.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + stat.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + stat.execute("DELETE FROM products WHERE id=111;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + waitUtilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + 60000L); + + expectedEvents = + Arrays.asList( + String.format( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", + mysqlInventoryDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + validateResult(expectedEvents); + } + + private void validateResult(List expectedEvents) { + String stdout = taskManagerConsumer.toUtf8String(); + for (String event : expectedEvents) { + if (!stdout.contains(event)) { + throw new RuntimeException( + "failed to get specific event: " + event + " from stdout: " + stdout); + } + } + } + + private void waitUtilSpecificEvent(String event, long timeout) throws Exception { + boolean result = false; + long endTimeout = System.currentTimeMillis() + timeout; + while (System.currentTimeMillis() < endTimeout) { + String stdout = taskManagerConsumer.toUtf8String(); + if (stdout.contains(event)) { + result = true; + break; + } + Thread.sleep(1000); + } + if (!result) { + throw new TimeoutException( + "failed to get specific event: " + + event + + " from stdout: " + + taskManagerConsumer.toUtf8String()); + } + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java new file mode 100644 index 000000000..72dff1818 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests.utils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container.ExecResult; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.ToStringConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Test environment running pipeline job on Flink containers. */ +@RunWith(Parameterized.class) +public abstract class PipelineTestEnvironment extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(PipelineTestEnvironment.class); + + @Parameterized.Parameter public String flinkVersion; + + // ------------------------------------------------------------------------------------------ + // Flink Variables + // ------------------------------------------------------------------------------------------ + public static final int JOB_MANAGER_REST_PORT = 8081; + public static final String INTER_CONTAINER_JM_ALIAS = "jobmanager"; + public static final String INTER_CONTAINER_TM_ALIAS = "taskmanager"; + public static final String FLINK_PROPERTIES = + String.join( + "\n", + Arrays.asList( + "jobmanager.rpc.address: jobmanager", + "taskmanager.numberOfTaskSlots: 10", + "parallelism.default: 4", + "execution.checkpointing.interval: 300", + // this is needed for oracle-cdc tests. + // see https://stackoverflow.com/a/47062742/4915129 + "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false")); + + @ClassRule public static final Network NETWORK = Network.newNetwork(); + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Nullable protected RestClusterClient restClusterClient; + protected GenericContainer jobManager; + protected GenericContainer taskManager; + + protected ToStringConsumer jobManagerConsumer; + + protected ToStringConsumer taskManagerConsumer; + + @Parameterized.Parameters(name = "flinkVersion: {0}") + public static List getFlinkVersion() { + return Arrays.asList("1.17.1", "1.18.0"); + } + + @Before + public void before() throws Exception { + LOG.info("Starting containers..."); + jobManagerConsumer = new ToStringConsumer(); + jobManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withExtraHost("host.docker.internal", "host-gateway") + .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) + .withExposedPorts(JOB_MANAGER_REST_PORT) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withLogConsumer(jobManagerConsumer); + taskManagerConsumer = new ToStringConsumer(); + taskManager = + new GenericContainer<>(getFlinkDockerImageTag()) + .withCommand("taskmanager") + .withExtraHost("host.docker.internal", "host-gateway") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) + .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .dependsOn(jobManager) + .withLogConsumer(taskManagerConsumer); + + Startables.deepStart(Stream.of(jobManager)).join(); + Startables.deepStart(Stream.of(taskManager)).join(); + LOG.info("Containers are started."); + } + + @After + public void after() { + if (restClusterClient != null) { + restClusterClient.close(); + } + if (jobManager != null) { + jobManager.stop(); + } + if (taskManager != null) { + taskManager.stop(); + } + } + + /** Allow overriding the default flink properties. */ + public void overrideFlinkProperties(String properties) { + jobManager.withEnv("FLINK_PROPERTIES", properties); + taskManager.withEnv("FLINK_PROPERTIES", properties); + } + + /** + * Submits a SQL job to the running cluster. + * + *

NOTE: You should not use {@code '\t'}. + */ + public void submitPipelineJob(String pipelineJob, Path... jars) + throws IOException, InterruptedException { + for (Path jar : jars) { + jobManager.copyFileToContainer( + MountableFile.forHostPath(jar), "/tmp/flinkCDC/lib/" + jar.getFileName()); + } + jobManager.copyFileToContainer( + MountableFile.forHostPath( + TestUtils.getResource("flink-cdc.sh", "flink-cdc-dist", "src"), 755), + "/tmp/flinkCDC/bin/flink-cdc.sh"); + jobManager.copyFileToContainer( + MountableFile.forHostPath( + TestUtils.getResource("flink-cdc.yaml", "flink-cdc-dist", "src"), 755), + "/tmp/flinkCDC/conf/flink-cdc.yaml"); + jobManager.copyFileToContainer( + MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")), + "/tmp/flinkCDC/lib/flink-cdc-dist.jar"); + Path script = temporaryFolder.newFile().toPath(); + Files.write(script, pipelineJob.getBytes()); + jobManager.copyFileToContainer( + MountableFile.forHostPath(script), "/tmp/flinkCDC/conf/pipeline.yaml"); + String commands = + "/tmp/flinkCDC/bin/flink-cdc.sh /tmp/flinkCDC/conf/pipeline.yaml --flink-home /opt/flink"; + ExecResult execResult = jobManager.execInContainer("bash", "-c", commands); + LOG.info(execResult.getStdout()); + LOG.error(execResult.getStderr()); + if (execResult.getExitCode() != 0) { + throw new AssertionError("Failed when submitting the pipeline job."); + } + } + + /** + * Get {@link RestClusterClient} connected to this FlinkContainer. + * + *

This method lazily initializes the REST client on-demand. + */ + public RestClusterClient getRestClusterClient() { + if (restClusterClient != null) { + return restClusterClient; + } + checkState( + jobManager.isRunning(), + "Cluster client should only be retrieved for a running cluster"); + try { + final Configuration clientConfiguration = new Configuration(); + clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost()); + clientConfiguration.set( + RestOptions.PORT, jobManager.getMappedPort(JOB_MANAGER_REST_PORT)); + this.restClusterClient = + new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance()); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to create client for Flink container cluster", e); + } + return restClusterClient; + } + + public void waitUntilJobRunning(Duration timeout) { + RestClusterClient clusterClient = getRestClusterClient(); + Deadline deadline = Deadline.fromNow(timeout); + while (deadline.hasTimeLeft()) { + Collection jobStatusMessages; + try { + jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Error when fetching job status.", e); + continue; + } + if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) { + JobStatusMessage message = jobStatusMessages.iterator().next(); + JobStatus jobStatus = message.getJobState(); + if (jobStatus.isTerminalState()) { + throw new ValidationException( + String.format( + "Job has been terminated! JobName: %s, JobID: %s, Status: %s", + message.getJobName(), + message.getJobId(), + message.getJobState())); + } else if (jobStatus == JobStatus.RUNNING) { + return; + } + } + } + } + + protected String getFlinkDockerImageTag() { + return String.format("flink:%s-scala_2.12", flinkVersion); + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory.sql new file mode 100644 index 000000000..4e9b44f71 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory.sql @@ -0,0 +1,53 @@ +-- Copyright 2023 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: mysql_inventory +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT, + enum_c enum('red', 'white') default 'red', -- test some complex types as well, + json_c JSON, -- because we use additional dependencies to deserialize complex types. + point_c POINT +); +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (default,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1": "value1"}', ST_GeomFromText('POINT(1 1)')), + (default,"car battery","12V car battery",8.1, 'white', '{"key2": "value2"}', ST_GeomFromText('POINT(2 2)')), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3 3)')), + (default,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4": "value4"}', ST_GeomFromText('POINT(4 4)')), + (default,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1", "k2": "v2"}', ST_GeomFromText('POINT(5 5)')), + (default,"hammer","16oz carpenter's hammer",1.0, null, null, null), + (default,"rocks","box of assorted rocks",5.3, null, null, null), + (default,"jacket","water resistent black wind breaker",0.1, null, null, null), + (default,"spare tire","24 inch spare tire",22.2, null, null, null); + +-- Create and populate our customers using a single insert with many rows +CREATE TABLE customers ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (104,"user_4","Shanghai","123567891234"); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/my.cnf b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/my.cnf new file mode 100644 index 000000000..11d6c94ee --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/my.cnf @@ -0,0 +1,62 @@ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +#secure-file-priv=/var/lib/mysql-files +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/setup.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/setup.sql new file mode 100644 index 000000000..8586a8489 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/mysql/setup.sql @@ -0,0 +1,28 @@ +-- Copyright 2023 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- In production you would almost certainly limit the replication user must be on the follower (slave) machine, +-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'. +-- However, in this database we'll grant 2 users different privileges: +-- +-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing) +-- 2) 'mysqluser' - all privileges +-- +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%'; +CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw'; +GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%'; + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: emptydb +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE emptydb; diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/log4j2-test.properties b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..a9d045e0e --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/log4j2-test.properties @@ -0,0 +1,26 @@ +################################################################################ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml index 795f7938b..adb3e0f82 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml @@ -40,6 +40,14 @@ limitations under the License. + + org.apache.flink + flink-cdc-e2e-utils + ${project.version} + test-jar + test + + mysql diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java index e7c898e9f..9094da69e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/Db2E2eITCase.java @@ -17,9 +17,9 @@ package org.apache.flink.cdc.connectors.tests; +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; -import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy; -import org.apache.flink.cdc.connectors.tests.utils.TestUtils; import org.junit.After; import org.junit.Before; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java index 610897171..f3db2624d 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java @@ -17,10 +17,10 @@ package org.apache.flink.cdc.connectors.tests; +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; -import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy; -import org.apache.flink.cdc.connectors.tests.utils.TestUtils; import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MySqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MySqlE2eITCase.java index 2a5f7a10f..bf54cb4d7 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MySqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MySqlE2eITCase.java @@ -17,9 +17,9 @@ package org.apache.flink.cdc.connectors.tests; +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; -import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy; -import org.apache.flink.cdc.connectors.tests.utils.TestUtils; import org.junit.Test; import org.slf4j.Logger; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java index e3d329882..e257311a5 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java @@ -17,9 +17,9 @@ package org.apache.flink.cdc.connectors.tests; +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; -import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy; -import org.apache.flink.cdc.connectors.tests.utils.TestUtils; import org.junit.Before; import org.junit.ClassRule; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java index 585778f2d..f72f03be7 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OracleE2eITCase.java @@ -17,9 +17,9 @@ package org.apache.flink.cdc.connectors.tests; +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; -import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy; -import org.apache.flink.cdc.connectors.tests.utils.TestUtils; import org.junit.After; import org.junit.Before; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/PostgresE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/PostgresE2eITCase.java index aa4f3daf1..2580f8c56 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/PostgresE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/PostgresE2eITCase.java @@ -17,9 +17,9 @@ package org.apache.flink.cdc.connectors.tests; +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; -import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy; -import org.apache.flink.cdc.connectors.tests.utils.TestUtils; import org.junit.After; import org.junit.Before; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/SqlServerE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/SqlServerE2eITCase.java index ad9bc6acb..8c832aa5e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/SqlServerE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/SqlServerE2eITCase.java @@ -17,9 +17,9 @@ package org.apache.flink.cdc.connectors.tests; +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; -import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy; -import org.apache.flink.cdc.connectors.tests.utils.TestUtils; import org.junit.After; import org.junit.Before; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java index f64d33ea1..232d6f7b7 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java @@ -17,9 +17,9 @@ package org.apache.flink.cdc.connectors.tests; +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; -import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy; -import org.apache.flink.cdc.connectors.tests.utils.TestUtils; import org.junit.After; import org.junit.Before; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/VitessE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/VitessE2eITCase.java index b2e94622c..ce0d31132 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/VitessE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/VitessE2eITCase.java @@ -17,9 +17,9 @@ package org.apache.flink.cdc.connectors.tests; +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; -import org.apache.flink.cdc.connectors.tests.utils.JdbcProxy; -import org.apache.flink.cdc.connectors.tests.utils.TestUtils; import org.apache.flink.cdc.connectors.vitess.VitessTestBase; import org.apache.flink.cdc.connectors.vitess.container.VitessContainer; diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java index bef68a676..650e205b8 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java @@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.tests.utils; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.cdc.common.test.utils.TestUtils; import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml index b22be5c23..3ac5fe598 100644 --- a/flink-cdc-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/pom.xml @@ -30,6 +30,7 @@ limitations under the License. pom + flink-cdc-e2e-utils flink-cdc-source-e2e-tests flink-cdc-pipeline-e2e-tests