[tidb][test] Add tidb pipeline in azure CI

pull/898/head
gongzhongqiang 3 years ago committed by Leonard Xu
parent c51a35d7cd
commit 570cba1159

@ -175,6 +175,13 @@ under the License.
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dns-cache-manipulator</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>

@ -20,17 +20,20 @@ package com.ververica.cdc.connectors.tidb;
import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.AbstractTestBase;
import com.alibaba.dcm.DnsCacheManipulator;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException; import org.awaitility.core.ConditionTimeoutException;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables; import org.testcontainers.lifecycle.Startables;
import java.io.File;
import java.net.URL; import java.net.URL;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
@ -54,63 +57,98 @@ public class TiDBTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TiDBTestBase.class); private static final Logger LOG = LoggerFactory.getLogger(TiDBTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
public static final String PD_SERVICE_NAME = "pd0";
public static final String TIKV_SERVICE_NAME = "tikv0";
public static final String TIDB_SERVICE_NAME = "tidb0";
public static final String TIDB_USER = "root"; public static final String TIDB_USER = "root";
public static final String TIDB_PASSWORD = ""; public static final String TIDB_PASSWORD = "";
public static final int TIDB_PORT = 4000; public static final int TIDB_PORT = 4000;
public static final int TIKV_PORT = 20160;
public static final int PD_PORT = 2379; public static final int PD_PORT = 2379;
public static final String TIDB_SERVICE_NAME = "tidb"; @ClassRule public static final Network NETWORK = Network.newNetwork();
public static final String TIKV_SERVICE_NAME = "tikv";
public static final String PD_SERVICE_NAME = "pd"; @ClassRule
public static final GenericContainer<?> PD =
public static final DockerComposeContainer TIDB_DOCKER_COMPOSE = new FixedHostPortGenericContainer<>("pingcap/pd:v5.3.1")
new DockerComposeContainer(new File("src/test/resources/docker/docker-compose.yml")) .withFixedExposedPort(PD_PORT, PD_PORT)
.withExposedService( .withFileSystemBind("src/test/resources/config/pd.toml", "/pd.toml")
TIDB_SERVICE_NAME + "_1", .withCommand(
TIDB_PORT, "--name=pd0",
Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(120))) "--client-urls=http://0.0.0.0:2379",
.withExposedService( "--peer-urls=http://0.0.0.0:2380",
PD_SERVICE_NAME + "_1", "--advertise-client-urls=http://pd0:2379",
PD_PORT, "--advertise-peer-urls=http://pd0:2380",
Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(120))) "--initial-cluster=pd0=http://pd0:2380",
.withLocalCompose(true); "--data-dir=/data/pd0",
"--config=/pd.toml",
"--log-file=/logs/pd0.log")
.withNetwork(NETWORK)
.withNetworkAliases(PD_SERVICE_NAME)
.withStartupTimeout(Duration.ofSeconds(120))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ClassRule
public static final GenericContainer<?> TIKV =
new FixedHostPortGenericContainer<>("pingcap/tikv:v5.3.1")
.withFixedExposedPort(TIKV_PORT, TIKV_PORT)
.withFileSystemBind("src/test/resources/config/tikv.toml", "/tikv.toml")
.withCommand(
"--addr=0.0.0.0:20160",
"--advertise-addr=tikv0:20160",
"--data-dir=/data/tikv0",
"--pd=pd0:2379",
"--config=/tikv.toml",
"--log-file=/logs/tikv0.log")
.withNetwork(NETWORK)
.dependsOn(PD)
.withNetworkAliases(TIKV_SERVICE_NAME)
.withStartupTimeout(Duration.ofSeconds(120))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ClassRule
public static final GenericContainer<?> TIDB =
new GenericContainer<>("pingcap/tidb:v5.3.1")
.withExposedPorts(TIDB_PORT)
.withFileSystemBind("src/test/resources/config/tidb.toml", "/tidb.toml")
.withCommand(
"--store=tikv",
"--path=pd0:2379",
"--config=/tidb.toml",
"--advertise-address=tidb0")
.withNetwork(NETWORK)
.dependsOn(TIKV)
.withNetworkAliases(TIDB_SERVICE_NAME)
.withStartupTimeout(Duration.ofSeconds(120))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@BeforeClass @BeforeClass
public static void startContainers() throws Exception { public static void startContainers() throws Exception {
LOG.info("Starting containers..."); LOG.info("Starting containers...");
Startables.deepStart(Stream.of(TIDB_DOCKER_COMPOSE)).join(); Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join();
// Add jvm dns cache for flink to invoke pd interface.
DnsCacheManipulator.setDnsCache(PD_SERVICE_NAME, "127.0.0.1");
DnsCacheManipulator.setDnsCache(TIKV_SERVICE_NAME, "127.0.0.1");
LOG.info("Containers are started."); LOG.info("Containers are started.");
} }
@AfterClass @AfterClass
public static void stopContainers() { public static void stopContainers() {
if (TIDB_DOCKER_COMPOSE != null) { Stream.of(TIKV, PD, TIDB).forEach(GenericContainer::stop);
TIDB_DOCKER_COMPOSE.stop();
}
}
public String getTIDBHost() {
return TIDB_DOCKER_COMPOSE.getServiceHost(TIDB_SERVICE_NAME, TIDB_PORT);
}
public Integer getTIDBPort() {
return TIDB_DOCKER_COMPOSE.getServicePort(TIDB_SERVICE_NAME, TIDB_PORT);
}
public String getPDHost() {
return TIDB_DOCKER_COMPOSE.getServiceHost(PD_SERVICE_NAME, PD_PORT);
}
public Integer getPDPort() {
return TIDB_DOCKER_COMPOSE.getServicePort(PD_SERVICE_NAME, PD_PORT);
} }
public String getJdbcUrl(String databaseName) { public String getJdbcUrl(String databaseName) {
return "jdbc:mysql://" + getTIDBHost() + ":" + getTIDBPort() + "/" + databaseName; return "jdbc:mysql://"
+ TIDB.getContainerIpAddress()
+ ":"
+ TIDB.getMappedPort(TIDB_PORT)
+ "/"
+ databaseName;
} }
protected Connection getJdbcConnection(String databaseName) throws SQLException { protected Connection getJdbcConnection(String databaseName) throws SQLException {
return DriverManager.getConnection(getJdbcUrl(databaseName), TIDB_USER, TIDB_PASSWORD); return DriverManager.getConnection(getJdbcUrl(databaseName), TIDB_USER, TIDB_PASSWORD);
} }

@ -79,8 +79,8 @@ public class TiDBConnectorITCase extends TiDBTestBase {
+ " 'database-name' = '%s'," + " 'database-name' = '%s',"
+ " 'table-name' = '%s'" + " 'table-name' = '%s'"
+ ")", + ")",
getTIDBHost(), TIDB.getContainerIpAddress(),
getPDHost() + ":" + getPDPort(), PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT),
TIDB_USER, TIDB_USER,
TIDB_PASSWORD, TIDB_PASSWORD,
"inventory", "inventory",
@ -190,8 +190,8 @@ public class TiDBConnectorITCase extends TiDBTestBase {
+ " 'database-name' = '%s'," + " 'database-name' = '%s',"
+ " 'table-name' = '%s'" + " 'table-name' = '%s'"
+ ")", + ")",
getTIDBHost(), TIDB.getContainerIpAddress(),
getPDHost() + ":" + getPDPort(), PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT),
TIDB_USER, TIDB_USER,
TIDB_PASSWORD, TIDB_PASSWORD,
"inventory", "inventory",
@ -218,17 +218,14 @@ public class TiDBConnectorITCase extends TiDBTestBase {
waitForSinkSize("sink", 9); waitForSinkSize("sink", 9);
try (Connection connection = getJdbcConnection("inventory"); try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
statement.execute("ALTER TABLE products DROP COLUMN description"); statement.execute("ALTER TABLE products DROP COLUMN description");
statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
statement.execute( statement.execute("INSERT INTO products VALUES (default,'jacket',0.2);"); // 110
"INSERT INTO products VALUES (default,'jacket',0.2);"); // 110 statement.execute("INSERT INTO products VALUES (default,'scooter',5.18);"); // 111
statement.execute( statement.execute("UPDATE products SET name='jacket2', weight='0.5' WHERE id=110;");
"INSERT INTO products VALUES (default,'scooter',5.18);"); // 111
statement.execute(
"UPDATE products SET name='jacket2', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;"); statement.execute("DELETE FROM products WHERE id=111;");
} }
@ -278,8 +275,8 @@ public class TiDBConnectorITCase extends TiDBTestBase {
+ " 'database-name' = '%s'," + " 'database-name' = '%s',"
+ " 'table-name' = '%s'" + " 'table-name' = '%s'"
+ ")", + ")",
getTIDBHost(), TIDB.getContainerIpAddress(),
getPDHost() + ":" + getPDPort(), PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT),
TIDB_USER, TIDB_USER,
TIDB_PASSWORD, TIDB_PASSWORD,
"inventory", "inventory",
@ -306,7 +303,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
waitForSinkSize("sink", 9); waitForSinkSize("sink", 9);
try (Connection connection = getJdbcConnection("inventory"); try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
statement.execute("ALTER TABLE products ADD COLUMN serialnum INTEGER"); statement.execute("ALTER TABLE products ADD COLUMN serialnum INTEGER");

@ -1,65 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "2.1"
services:
pd:
image: pingcap/pd:latest
ports:
- "2379:2379"
volumes:
- ./config/pd.toml:/pd.toml
command:
- --client-urls=http://0.0.0.0:2379
- --peer-urls=http://0.0.0.0:2380
- --advertise-client-urls=http://192.168.30.157:2379
- --advertise-peer-urls=http://pd:2380
- --initial-cluster=pd=http://pd:2380
- --data-dir=/data/pd
- --config=/pd.toml
- --log-file=/logs/pd.log
restart: on-failure
tikv:
image: pingcap/tikv:latest
ports:
- "20160:20160"
volumes:
- ./config/tikv.toml:/tikv.toml
command:
- --addr=0.0.0.0:20160
- --advertise-addr=192.168.30.157:20160
- --data-dir=/data/tikv
- --pd=pd:2379
- --config=/tikv.toml
- --log-file=/logs/tikv.log
depends_on:
- "pd"
restart: on-failure
tidb:
image: pingcap/tidb:latest
volumes:
- ./config/tidb.toml:/tidb.toml
command:
- --store=tikv
- --path=pd:2379
- --config=/tidb.toml
- --log-file=/logs/tidb.log
- --advertise-address=tidb
depends_on:
- "tikv"
restart: on-failure

@ -84,6 +84,8 @@ jobs:
module: mongodb module: mongodb
sqlserver: sqlserver:
module: sqlserver module: sqlserver
tidb:
module: tidb
e2e: e2e:
module: e2e module: e2e
misc: misc:

@ -21,6 +21,7 @@ STAGE_POSTGRES="postgres"
STAGE_ORACLE="oracle" STAGE_ORACLE="oracle"
STAGE_MONGODB="mongodb" STAGE_MONGODB="mongodb"
STAGE_SQLSERVER="sqlserver" STAGE_SQLSERVER="sqlserver"
STAGE_TIDB="tidb"
STAGE_E2E="e2e" STAGE_E2E="e2e"
STAGE_MISC="misc" STAGE_MISC="misc"
@ -44,6 +45,10 @@ MODULES_SQLSERVER="\
flink-connector-sqlserver-cdc,\ flink-connector-sqlserver-cdc,\
flink-sql-connector-sqlserver-cdc" flink-sql-connector-sqlserver-cdc"
MODULES_TIDB="\
flink-connector-tidb-cdc,\
flink-sql-connector-tidb-cdc"
MODULES_E2E="\ MODULES_E2E="\
flink-cdc-e2e-tests" flink-cdc-e2e-tests"
@ -66,6 +71,9 @@ function get_compile_modules_for_stage() {
(${STAGE_SQLSERVER}) (${STAGE_SQLSERVER})
echo "-pl $MODULES_SQLSERVER -am" echo "-pl $MODULES_SQLSERVER -am"
;; ;;
(${STAGE_TIDB})
echo "-pl $MODULES_TIDB -am"
;;
(${STAGE_E2E}) (${STAGE_E2E})
# compile everything; using the -am switch does not work with negated module lists! # compile everything; using the -am switch does not work with negated module lists!
# the negation takes precedence, thus not all required modules would be built # the negation takes precedence, thus not all required modules would be built
@ -87,14 +95,16 @@ function get_test_modules_for_stage() {
local modules_oracle=$MODULES_ORACLE local modules_oracle=$MODULES_ORACLE
local modules_mongodb=$MODULES_MONGODB local modules_mongodb=$MODULES_MONGODB
local modules_sqlserver=$MODULES_SQLSERVER local modules_sqlserver=$MODULES_SQLSERVER
local modules_tidb=$MODULES_TIDB
local modules_e2e=$MODULES_E2E local modules_e2e=$MODULES_E2E
local negated_mysql=\!${MODULES_MYSQL//,/,\!} local negated_mysql=\!${MODULES_MYSQL//,/,\!}
local negated_postgres=\!${MODULES_POSTGRES//,/,\!} local negated_postgres=\!${MODULES_POSTGRES//,/,\!}
local negated_oracle=\!${MODULES_ORACLE//,/,\!} local negated_oracle=\!${MODULES_ORACLE//,/,\!}
local negated_mongodb=\!${MODULES_MONGODB//,/,\!} local negated_mongodb=\!${MODULES_MONGODB//,/,\!}
local negated_sqlserver=\!${MODULES_SQLSERVER//,/,\!} local negated_sqlserver=\!${MODULES_SQLSERVER//,/,\!}
local negated_tidb=\!${MODULES_TIDB//,/,\!}
local negated_e2e=\!${MODULES_E2E//,/,\!} local negated_e2e=\!${MODULES_E2E//,/,\!}
local modules_misc="$negated_mysql,$negated_postgres,$negated_oracle,$negated_mongodb,$negated_sqlserver,$negated_e2e" local modules_misc="$negated_mysql,$negated_postgres,$negated_oracle,$negated_mongodb,$negated_sqlserver,$negated_tidb,$negated_e2e"
case ${stage} in case ${stage} in
(${STAGE_MYSQL}) (${STAGE_MYSQL})
@ -112,6 +122,9 @@ function get_test_modules_for_stage() {
(${STAGE_SQLSERVER}) (${STAGE_SQLSERVER})
echo "-pl $modules_sqlserver" echo "-pl $modules_sqlserver"
;; ;;
(${STAGE_TIDB})
echo "-pl $modules_tidb"
;;
(${STAGE_E2E}) (${STAGE_E2E})
echo "-pl $modules_e2e" echo "-pl $modules_e2e"
;; ;;

Loading…
Cancel
Save