diff --git a/flink-connector-tidb-cdc/pom.xml b/flink-connector-tidb-cdc/pom.xml index b403126c0..90b0f7660 100644 --- a/flink-connector-tidb-cdc/pom.xml +++ b/flink-connector-tidb-cdc/pom.xml @@ -175,6 +175,13 @@ under the License. test + + com.alibaba + dns-cache-manipulator + 1.7.1 + test + + diff --git a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/TiDBTestBase.java b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/TiDBTestBase.java index 960af0e6a..c4d4fb551 100644 --- a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/TiDBTestBase.java +++ b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/TiDBTestBase.java @@ -20,17 +20,20 @@ package com.ververica.cdc.connectors.tidb; import org.apache.flink.test.util.AbstractTestBase; +import com.alibaba.dcm.DnsCacheManipulator; import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.DockerComposeContainer; -import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; -import java.io.File; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; @@ -54,63 +57,98 @@ public class TiDBTestBase extends AbstractTestBase { private static final Logger LOG = LoggerFactory.getLogger(TiDBTestBase.class); private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + public static final String PD_SERVICE_NAME = "pd0"; + public static final String TIKV_SERVICE_NAME = "tikv0"; + public static final String TIDB_SERVICE_NAME = "tidb0"; + public static final String TIDB_USER = "root"; public static final String TIDB_PASSWORD = ""; + public static final int TIDB_PORT = 4000; + public static final int TIKV_PORT = 20160; public static final int PD_PORT = 2379; - public static final String TIDB_SERVICE_NAME = "tidb"; - public static final String TIKV_SERVICE_NAME = "tikv"; - public static final String PD_SERVICE_NAME = "pd"; - - public static final DockerComposeContainer TIDB_DOCKER_COMPOSE = - new DockerComposeContainer(new File("src/test/resources/docker/docker-compose.yml")) - .withExposedService( - TIDB_SERVICE_NAME + "_1", - TIDB_PORT, - Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(120))) - .withExposedService( - PD_SERVICE_NAME + "_1", - PD_PORT, - Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(120))) - .withLocalCompose(true); + @ClassRule public static final Network NETWORK = Network.newNetwork(); + + @ClassRule + public static final GenericContainer PD = + new FixedHostPortGenericContainer<>("pingcap/pd:v5.3.1") + .withFixedExposedPort(PD_PORT, PD_PORT) + .withFileSystemBind("src/test/resources/config/pd.toml", "/pd.toml") + .withCommand( + "--name=pd0", + "--client-urls=http://0.0.0.0:2379", + "--peer-urls=http://0.0.0.0:2380", + "--advertise-client-urls=http://pd0:2379", + "--advertise-peer-urls=http://pd0:2380", + "--initial-cluster=pd0=http://pd0:2380", + "--data-dir=/data/pd0", + "--config=/pd.toml", + "--log-file=/logs/pd0.log") + .withNetwork(NETWORK) + .withNetworkAliases(PD_SERVICE_NAME) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @ClassRule + public static final GenericContainer TIKV = + new FixedHostPortGenericContainer<>("pingcap/tikv:v5.3.1") + .withFixedExposedPort(TIKV_PORT, TIKV_PORT) + .withFileSystemBind("src/test/resources/config/tikv.toml", "/tikv.toml") + .withCommand( + "--addr=0.0.0.0:20160", + "--advertise-addr=tikv0:20160", + "--data-dir=/data/tikv0", + "--pd=pd0:2379", + "--config=/tikv.toml", + "--log-file=/logs/tikv0.log") + .withNetwork(NETWORK) + .dependsOn(PD) + .withNetworkAliases(TIKV_SERVICE_NAME) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @ClassRule + public static final GenericContainer TIDB = + new GenericContainer<>("pingcap/tidb:v5.3.1") + .withExposedPorts(TIDB_PORT) + .withFileSystemBind("src/test/resources/config/tidb.toml", "/tidb.toml") + .withCommand( + "--store=tikv", + "--path=pd0:2379", + "--config=/tidb.toml", + "--advertise-address=tidb0") + .withNetwork(NETWORK) + .dependsOn(TIKV) + .withNetworkAliases(TIDB_SERVICE_NAME) + .withStartupTimeout(Duration.ofSeconds(120)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); @BeforeClass public static void startContainers() throws Exception { LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(TIDB_DOCKER_COMPOSE)).join(); + Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join(); + // Add jvm dns cache for flink to invoke pd interface. + DnsCacheManipulator.setDnsCache(PD_SERVICE_NAME, "127.0.0.1"); + DnsCacheManipulator.setDnsCache(TIKV_SERVICE_NAME, "127.0.0.1"); LOG.info("Containers are started."); } @AfterClass public static void stopContainers() { - if (TIDB_DOCKER_COMPOSE != null) { - TIDB_DOCKER_COMPOSE.stop(); - } - } - - public String getTIDBHost() { - return TIDB_DOCKER_COMPOSE.getServiceHost(TIDB_SERVICE_NAME, TIDB_PORT); - } - - public Integer getTIDBPort() { - return TIDB_DOCKER_COMPOSE.getServicePort(TIDB_SERVICE_NAME, TIDB_PORT); - } - - public String getPDHost() { - return TIDB_DOCKER_COMPOSE.getServiceHost(PD_SERVICE_NAME, PD_PORT); - } - - public Integer getPDPort() { - return TIDB_DOCKER_COMPOSE.getServicePort(PD_SERVICE_NAME, PD_PORT); + Stream.of(TIKV, PD, TIDB).forEach(GenericContainer::stop); } public String getJdbcUrl(String databaseName) { - return "jdbc:mysql://" + getTIDBHost() + ":" + getTIDBPort() + "/" + databaseName; + return "jdbc:mysql://" + + TIDB.getContainerIpAddress() + + ":" + + TIDB.getMappedPort(TIDB_PORT) + + "/" + + databaseName; } protected Connection getJdbcConnection(String databaseName) throws SQLException { - return DriverManager.getConnection(getJdbcUrl(databaseName), TIDB_USER, TIDB_PASSWORD); } diff --git a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBConnectorITCase.java b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBConnectorITCase.java index afc0597b7..099105137 100644 --- a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBConnectorITCase.java +++ b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBConnectorITCase.java @@ -79,8 +79,8 @@ public class TiDBConnectorITCase extends TiDBTestBase { + " 'database-name' = '%s'," + " 'table-name' = '%s'" + ")", - getTIDBHost(), - getPDHost() + ":" + getPDPort(), + TIDB.getContainerIpAddress(), + PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT), TIDB_USER, TIDB_PASSWORD, "inventory", @@ -190,8 +190,8 @@ public class TiDBConnectorITCase extends TiDBTestBase { + " 'database-name' = '%s'," + " 'table-name' = '%s'" + ")", - getTIDBHost(), - getPDHost() + ":" + getPDPort(), + TIDB.getContainerIpAddress(), + PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT), TIDB_USER, TIDB_PASSWORD, "inventory", @@ -218,17 +218,14 @@ public class TiDBConnectorITCase extends TiDBTestBase { waitForSinkSize("sink", 9); try (Connection connection = getJdbcConnection("inventory"); - Statement statement = connection.createStatement()) { + Statement statement = connection.createStatement()) { statement.execute("ALTER TABLE products DROP COLUMN description"); statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); - statement.execute( - "INSERT INTO products VALUES (default,'jacket',0.2);"); // 110 - statement.execute( - "INSERT INTO products VALUES (default,'scooter',5.18);"); // 111 - statement.execute( - "UPDATE products SET name='jacket2', weight='0.5' WHERE id=110;"); + statement.execute("INSERT INTO products VALUES (default,'jacket',0.2);"); // 110 + statement.execute("INSERT INTO products VALUES (default,'scooter',5.18);"); // 111 + statement.execute("UPDATE products SET name='jacket2', weight='0.5' WHERE id=110;"); statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM products WHERE id=111;"); } @@ -278,8 +275,8 @@ public class TiDBConnectorITCase extends TiDBTestBase { + " 'database-name' = '%s'," + " 'table-name' = '%s'" + ")", - getTIDBHost(), - getPDHost() + ":" + getPDPort(), + TIDB.getContainerIpAddress(), + PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT), TIDB_USER, TIDB_PASSWORD, "inventory", @@ -306,7 +303,7 @@ public class TiDBConnectorITCase extends TiDBTestBase { waitForSinkSize("sink", 9); try (Connection connection = getJdbcConnection("inventory"); - Statement statement = connection.createStatement()) { + Statement statement = connection.createStatement()) { statement.execute("ALTER TABLE products ADD COLUMN serialnum INTEGER"); diff --git a/flink-connector-tidb-cdc/src/test/resources/docker/config/pd.toml b/flink-connector-tidb-cdc/src/test/resources/config/pd.toml similarity index 100% rename from flink-connector-tidb-cdc/src/test/resources/docker/config/pd.toml rename to flink-connector-tidb-cdc/src/test/resources/config/pd.toml diff --git a/flink-connector-tidb-cdc/src/test/resources/docker/config/tidb.toml b/flink-connector-tidb-cdc/src/test/resources/config/tidb.toml similarity index 100% rename from flink-connector-tidb-cdc/src/test/resources/docker/config/tidb.toml rename to flink-connector-tidb-cdc/src/test/resources/config/tidb.toml diff --git a/flink-connector-tidb-cdc/src/test/resources/docker/config/tikv.toml b/flink-connector-tidb-cdc/src/test/resources/config/tikv.toml similarity index 100% rename from flink-connector-tidb-cdc/src/test/resources/docker/config/tikv.toml rename to flink-connector-tidb-cdc/src/test/resources/config/tikv.toml diff --git a/flink-connector-tidb-cdc/src/test/resources/docker/docker-compose.yml b/flink-connector-tidb-cdc/src/test/resources/docker/docker-compose.yml deleted file mode 100644 index 00fa79958..000000000 --- a/flink-connector-tidb-cdc/src/test/resources/docker/docker-compose.yml +++ /dev/null @@ -1,65 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -version: "2.1" - -services: - pd: - image: pingcap/pd:latest - ports: - - "2379:2379" - volumes: - - ./config/pd.toml:/pd.toml - command: - - --client-urls=http://0.0.0.0:2379 - - --peer-urls=http://0.0.0.0:2380 - - --advertise-client-urls=http://192.168.30.157:2379 - - --advertise-peer-urls=http://pd:2380 - - --initial-cluster=pd=http://pd:2380 - - --data-dir=/data/pd - - --config=/pd.toml - - --log-file=/logs/pd.log - restart: on-failure - - tikv: - image: pingcap/tikv:latest - ports: - - "20160:20160" - volumes: - - ./config/tikv.toml:/tikv.toml - command: - - --addr=0.0.0.0:20160 - - --advertise-addr=192.168.30.157:20160 - - --data-dir=/data/tikv - - --pd=pd:2379 - - --config=/tikv.toml - - --log-file=/logs/tikv.log - depends_on: - - "pd" - restart: on-failure - - tidb: - image: pingcap/tidb:latest - volumes: - - ./config/tidb.toml:/tidb.toml - command: - - --store=tikv - - --path=pd:2379 - - --config=/tidb.toml - - --log-file=/logs/tidb.log - - --advertise-address=tidb - depends_on: - - "tikv" - restart: on-failure diff --git a/tools/azure-pipelines/jobs-template.yml b/tools/azure-pipelines/jobs-template.yml index a674f8c2f..f3f6ccf8d 100644 --- a/tools/azure-pipelines/jobs-template.yml +++ b/tools/azure-pipelines/jobs-template.yml @@ -84,6 +84,8 @@ jobs: module: mongodb sqlserver: module: sqlserver + tidb: + module: tidb e2e: module: e2e misc: diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh index c1b99dcf2..767fc5b24 100755 --- a/tools/ci/stage.sh +++ b/tools/ci/stage.sh @@ -21,6 +21,7 @@ STAGE_POSTGRES="postgres" STAGE_ORACLE="oracle" STAGE_MONGODB="mongodb" STAGE_SQLSERVER="sqlserver" +STAGE_TIDB="tidb" STAGE_E2E="e2e" STAGE_MISC="misc" @@ -44,6 +45,10 @@ MODULES_SQLSERVER="\ flink-connector-sqlserver-cdc,\ flink-sql-connector-sqlserver-cdc" +MODULES_TIDB="\ +flink-connector-tidb-cdc,\ +flink-sql-connector-tidb-cdc" + MODULES_E2E="\ flink-cdc-e2e-tests" @@ -66,6 +71,9 @@ function get_compile_modules_for_stage() { (${STAGE_SQLSERVER}) echo "-pl $MODULES_SQLSERVER -am" ;; + (${STAGE_TIDB}) + echo "-pl $MODULES_TIDB -am" + ;; (${STAGE_E2E}) # compile everything; using the -am switch does not work with negated module lists! # the negation takes precedence, thus not all required modules would be built @@ -87,14 +95,16 @@ function get_test_modules_for_stage() { local modules_oracle=$MODULES_ORACLE local modules_mongodb=$MODULES_MONGODB local modules_sqlserver=$MODULES_SQLSERVER + local modules_tidb=$MODULES_TIDB local modules_e2e=$MODULES_E2E local negated_mysql=\!${MODULES_MYSQL//,/,\!} local negated_postgres=\!${MODULES_POSTGRES//,/,\!} local negated_oracle=\!${MODULES_ORACLE//,/,\!} local negated_mongodb=\!${MODULES_MONGODB//,/,\!} local negated_sqlserver=\!${MODULES_SQLSERVER//,/,\!} + local negated_tidb=\!${MODULES_TIDB//,/,\!} local negated_e2e=\!${MODULES_E2E//,/,\!} - local modules_misc="$negated_mysql,$negated_postgres,$negated_oracle,$negated_mongodb,$negated_sqlserver,$negated_e2e" + local modules_misc="$negated_mysql,$negated_postgres,$negated_oracle,$negated_mongodb,$negated_sqlserver,$negated_tidb,$negated_e2e" case ${stage} in (${STAGE_MYSQL}) @@ -112,6 +122,9 @@ function get_test_modules_for_stage() { (${STAGE_SQLSERVER}) echo "-pl $modules_sqlserver" ;; + (${STAGE_TIDB}) + echo "-pl $modules_tidb" + ;; (${STAGE_E2E}) echo "-pl $modules_e2e" ;;