diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml
index 3c6d3d198..9ddb4da10 100644
--- a/flink-cdc-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/pom.xml
@@ -84,6 +84,13 @@ under the License.
test-jar
test
+
+ com.ververica
+ flink-connector-tidb-cdc
+ ${project.version}
+ test-jar
+ test
+
com.ververica
flink-connector-test-util
@@ -253,6 +260,16 @@ under the License.
${project.build.directory}/dependencies
+
+
+ com.ververica
+ flink-sql-connector-tidb-cdc
+ ${project.version}
+ tidb-cdc-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/TIDBE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/TIDBE2eITCase.java
new file mode 100644
index 000000000..ca2148db3
--- /dev/null
+++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/TIDBE2eITCase.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.tests;
+
+import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
+import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
+import com.ververica.cdc.connectors.tests.utils.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** End-to-end tests for tidb-cdc connector uber jar. */
+public class TIDBE2eITCase extends FlinkContainerTestEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TIDBE2eITCase.class);
+ private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+
+ public static final String PD_SERVICE_NAME = "pd0";
+ public static final String TIKV_SERVICE_NAME = "tikv0";
+ public static final String TIDB_SERVICE_NAME = "tidb0";
+
+ public static final String TIDB_USER = "root";
+ public static final String TIDB_PASSWORD = "";
+
+ public static final int TIDB_PORT = 4000;
+ public static final int TIKV_PORT = 20160;
+ public static final int PD_PORT = 2379;
+
+ private static final Path tidbCdcJar = TestUtils.getResource("tidb-cdc-connector.jar");
+ private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+
+ @ClassRule
+ public static final GenericContainer> PD =
+ new GenericContainer<>("pingcap/pd:v5.3.1")
+ .withExposedPorts(PD_PORT)
+ .withFileSystemBind("src/test/resources/docker/tidb/pd.toml", "/pd.toml")
+ .withCommand(
+ "--name=pd0",
+ "--client-urls=http://0.0.0.0:2379",
+ "--peer-urls=http://0.0.0.0:2380",
+ "--advertise-client-urls=http://pd0:2379",
+ "--advertise-peer-urls=http://pd0:2380",
+ "--initial-cluster=pd0=http://pd0:2380",
+ "--data-dir=/data/pd0",
+ "--config=/pd.toml",
+ "--log-file=/logs/pd0.log")
+ .withNetwork(NETWORK)
+ .withNetworkMode("host")
+ .withNetworkAliases(PD_SERVICE_NAME)
+ .withStartupTimeout(Duration.ofSeconds(120))
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ @ClassRule
+ public static final GenericContainer> TIKV =
+ new GenericContainer<>("pingcap/tikv:v5.3.1")
+ .withExposedPorts(TIKV_PORT)
+ .withFileSystemBind("src/test/resources/docker/tidb/tikv.toml", "/tikv.toml")
+ .withCommand(
+ "--addr=0.0.0.0:20160",
+ "--advertise-addr=tikv0:20160",
+ "--data-dir=/data/tikv0",
+ "--pd=pd0:2379",
+ "--config=/tikv.toml",
+ "--log-file=/logs/tikv0.log")
+ .withNetwork(NETWORK)
+ .dependsOn(PD)
+ .withNetworkAliases(TIKV_SERVICE_NAME)
+ .withStartupTimeout(Duration.ofSeconds(120))
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ @ClassRule
+ public static final GenericContainer> TIDB =
+ new GenericContainer<>("pingcap/tidb:v5.3.1")
+ .withExposedPorts(TIDB_PORT)
+ .withFileSystemBind("src/test/resources/docker/tidb/tidb.toml", "/tidb.toml")
+ .withCommand(
+ "--store=tikv",
+ "--path=pd0:2379",
+ "--config=/tidb.toml",
+ "--advertise-address=tidb0")
+ .withNetwork(NETWORK)
+ .dependsOn(TIKV)
+ .withNetworkAliases(TIDB_SERVICE_NAME)
+ .withStartupTimeout(Duration.ofSeconds(120))
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ @Before
+ public void before() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(PD, TIKV, TIDB)).join();
+ LOG.info("Containers are started.");
+ super.before();
+ initializeTidbTable("tidb_inventory");
+ }
+
+ @After
+ public void after() {
+ super.after();
+ if (TIDB != null) {
+ TIDB.stop();
+ }
+ if (TIKV != null) {
+ TIDB.stop();
+ }
+ if (PD != null) {
+ TIDB.stop();
+ }
+ }
+
+ @Test
+ public void testTIDBCDC() throws Exception {
+ List sqlLines =
+ Arrays.asList(
+ "CREATE TABLE tidb_source (",
+ " `id` INT NOT NULL,",
+ " name STRING,",
+ " description STRING,",
+ " weight DECIMAL(20, 10),",
+ " PRIMARY KEY (`id`) NOT ENFORCED",
+ ") WITH (",
+ " 'connector' = 'tidb-cdc',",
+ " 'hostname' = '" + TIDB_SERVICE_NAME + "',",
+ " 'tikv.grpc.timeout_in_ms' = '20000',",
+ " 'pd-addresses' = '" + PD_SERVICE_NAME + ":" + PD_PORT + "',",
+ " 'username' = '" + TIDB_USER + "',",
+ " 'password' = '" + TIDB_PASSWORD + "',",
+ " 'database-name' = 'inventory',",
+ " 'table-name' = 'products'",
+ ");",
+ "CREATE TABLE products_sink (",
+ " `id` INT NOT NULL,",
+ " name STRING,",
+ " description STRING,",
+ " weight DECIMAL(10,3),",
+ " primary key (`id`) not enforced",
+ ") WITH (",
+ " 'connector' = 'jdbc',",
+ String.format(
+ " 'url' = 'jdbc:mysql://%s:3306/%s',",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ mysqlInventoryDatabase.getDatabaseName()),
+ " 'table-name' = 'products_sink',",
+ " 'username' = '" + MYSQL_TEST_USER + "',",
+ " 'password' = '" + MYSQL_TEST_PASSWORD + "'",
+ ");",
+ "INSERT INTO products_sink",
+ "SELECT * FROM tidb_source;");
+
+ submitSQLJob(sqlLines, tidbCdcJar, jdbcJar, mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+
+ // generate binlogs
+ try (Connection connection = getTidbJdbcConnection("inventory");
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
+ statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+ statement.execute(
+ "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
+ statement.execute(
+ "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
+ statement.execute(
+ "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
+ statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
+ statement.execute("DELETE FROM products WHERE id=111;");
+
+ ResultSet resultSet = statement.executeQuery("SELECT count(1) FROM products");
+ int recordCount = 0;
+ while (resultSet.next()) {
+ recordCount = resultSet.getInt(1);
+ }
+ assertEquals(recordCount, 10);
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ // assert final results
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(),
+ MYSQL.getDatabasePort(),
+ mysqlInventoryDatabase.getDatabaseName());
+ JdbcProxy proxy =
+ new JdbcProxy(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS);
+ List expectResult =
+ Arrays.asList(
+ "101,scooter,Small 2-wheel scooter,3.14",
+ "102,car battery,12V car battery,8.1",
+ "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8",
+ "104,hammer,12oz carpenter's hammer,0.75",
+ "105,hammer,14oz carpenter's hammer,0.875",
+ "106,hammer,18oz carpenter hammer,1.0",
+ "107,rocks,box of assorted rocks,5.1",
+ "108,jacket,water resistent black wind breaker,0.1",
+ "109,spare tire,24 inch spare tire,22.2",
+ "110,jacket,new water resistent white wind breaker,0.5");
+ proxy.checkResultWithTimeout(
+ expectResult,
+ "products_sink",
+ new String[] {"id", "name", "description", "weight"},
+ 360000L);
+ }
+
+ protected Connection getTidbJdbcConnection(String databaseName) throws SQLException {
+ return DriverManager.getConnection(
+ "jdbc:mysql://"
+ + TIDB.getContainerIpAddress()
+ + ":"
+ + TIDB.getMappedPort(TIDB_PORT)
+ + "/"
+ + databaseName,
+ TIDB_USER,
+ TIDB_PASSWORD);
+ }
+
+ /**
+ * Executes a JDBC statement using the default jdbc config without autocommitting the
+ * connection.
+ */
+ protected void initializeTidbTable(String sqlFile) {
+ final String ddlFile = String.format("ddl/%s.sql", sqlFile);
+ final URL ddlTestFile = TIDBE2eITCase.class.getClassLoader().getResource(ddlFile);
+ assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
+ try (Connection connection = getTidbJdbcConnection("");
+ Statement statement = connection.createStatement()) {
+ final List statements =
+ Arrays.stream(
+ Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
+ .map(String::trim)
+ .filter(x -> !x.startsWith("--") && !x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+ COMMENT_PATTERN.matcher(x);
+ return m.matches() ? m.group(1) : x;
+ })
+ .collect(Collectors.joining("\n"))
+ .split(";"))
+ .collect(Collectors.toList());
+ for (String stmt : statements) {
+ statement.execute(stmt);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/flink-cdc-e2e-tests/src/test/resources/ddl/tidb_inventory.sql b/flink-cdc-e2e-tests/src/test/resources/ddl/tidb_inventory.sql
new file mode 100644
index 000000000..5f9435aac
--- /dev/null
+++ b/flink-cdc-e2e-tests/src/test/resources/ddl/tidb_inventory.sql
@@ -0,0 +1,44 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: inventory
+-- ----------------------------------------------------------------------------------------------------------------
+-- Create and populate our products using a single insert with many rows
+DROP DATABASE IF EXISTS inventory;
+
+CREATE DATABASE inventory;
+
+USE inventory;
+
+CREATE TABLE products
+(
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight DECIMAL(20, 10)
+);
+ALTER TABLE products AUTO_INCREMENT = 101;
+
+INSERT INTO products
+VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
+ (default, "car battery", "12V car battery", 8.1),
+ (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8),
+ (default, "hammer", "12oz carpenter's hammer", 0.75),
+ (default, "hammer", "14oz carpenter's hammer", 0.875),
+ (default, "hammer", "16oz carpenter's hammer", 1.0),
+ (default, "rocks", "box of assorted rocks", 5.3),
+ (default, "jacket", "water resistent black wind breaker", 0.1),
+ (default, "spare tire", "24 inch spare tire", 22.2);
diff --git a/flink-cdc-e2e-tests/src/test/resources/docker/tidb/pd.toml b/flink-cdc-e2e-tests/src/test/resources/docker/tidb/pd.toml
new file mode 100644
index 000000000..7c6bf65a7
--- /dev/null
+++ b/flink-cdc-e2e-tests/src/test/resources/docker/tidb/pd.toml
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# PD Configuration.
+
+name = "pd"
+data-dir = "default.pd"
+
+client-urls = "http://127.0.0.1:2379"
+# if not set, use ${client-urls}
+advertise-client-urls = ""
+
+peer-urls = "http://127.0.0.1:2380"
+# if not set, use ${peer-urls}
+advertise-peer-urls = ""
+
+initial-cluster = "pd=http://127.0.0.1:2380"
+initial-cluster-state = "new"
+
+lease = 3
+tso-save-interval = "3s"
+
+[security]
+# Path of file that contains list of trusted SSL CAs. if set, following four settings shouldn't be empty
+cacert-path = ""
+# Path of file that contains X509 certificate in PEM format.
+cert-path = ""
+# Path of file that contains X509 key in PEM format.
+key-path = ""
+
+[log]
+level = "error"
+
+# log format, one of json, text, console
+#format = "text"
+
+# disable automatic timestamps in output
+#disable-timestamp = false
+
+# file logging
+[log.file]
+#filename = ""
+# max log file size in MB
+#max-size = 300
+# max log file keep days
+#max-days = 28
+# maximum number of old log files to retain
+#max-backups = 7
+# rotate log by day
+#log-rotate = true
+
+[metric]
+# prometheus client push interval, set "0s" to disable prometheus.
+interval = "15s"
+# prometheus pushgateway address, leaves it empty will disable prometheus.
+address = "pushgateway:9091"
+
+[schedule]
+max-merge-region-size = 0
+split-merge-interval = "1h"
+max-snapshot-count = 3
+max-pending-peer-count = 16
+max-store-down-time = "30m"
+leader-schedule-limit = 4
+region-schedule-limit = 4
+replica-schedule-limit = 8
+merge-schedule-limit = 8
+tolerant-size-ratio = 5.0
+
+# customized schedulers, the format is as below
+# if empty, it will use balance-leader, balance-region, hot-region as default
+# [[schedule.schedulers]]
+# type = "evict-leader"
+# args = ["1"]
+
+[replication]
+# The number of replicas for each region.
+max-replicas = 3
+# The label keys specified the location of a store.
+# The placement priorities is implied by the order of label keys.
+# For example, ["zone", "rack"] means that we should place replicas to
+# different zones first, then to different racks if we don't have enough zones.
+location-labels = []
+
+[label-property]
+# Do not assign region leaders to stores that have these tags.
+# [[label-property.reject-leader]]
+# key = "zone"
+# value = "cn1
diff --git a/flink-cdc-e2e-tests/src/test/resources/docker/tidb/tidb.toml b/flink-cdc-e2e-tests/src/test/resources/docker/tidb/tidb.toml
new file mode 100644
index 000000000..be13bd3c5
--- /dev/null
+++ b/flink-cdc-e2e-tests/src/test/resources/docker/tidb/tidb.toml
@@ -0,0 +1,233 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# TiDB Configuration.
+
+# TiDB server host.
+host = "0.0.0.0"
+
+# TiDB server port.
+port = 4000
+
+# Registered store name, [tikv, mocktikv]
+store = "tikv"
+
+# TiDB storage path.
+path = "/tmp/tidb"
+
+# The socket file to use for connection.
+socket = ""
+
+# Run ddl worker on this tidb-server.
+run-ddl = true
+
+# Schema lease duration, very dangerous to change only if you know what you do.
+lease = "0"
+
+# When create table, split a separated region for it. It is recommended to
+# turn off this option if there will be a large number of tables created.
+split-table = true
+
+# The limit of concurrent executed sessions.
+token-limit = 1000
+
+# Only print a log when out of memory quota.
+# Valid options: ["log", "cancel"]
+oom-action = "log"
+
+# Set the memory quota for a query in bytes. Default: 32GB
+mem-quota-query = 34359738368
+
+# Set system variable 'lower_case_table_names'
+lower-case-table-names = 2
+
+[log]
+# Log level: debug, info, warn, error, fatal.
+level = "error"
+
+# Log format, one of json, text, console.
+format = "text"
+
+# Disable automatic timestamp in output
+disable-timestamp = false
+
+# Stores slow query log into separated files.
+slow-query-file = ""
+
+# Queries with execution time greater than this value will be logged. (Milliseconds)
+slow-threshold = 300
+
+# Queries with internal result greater than this value will be logged.
+expensive-threshold = 10000
+
+# Maximum query length recorded in log.
+query-log-max-len = 2048
+
+# File logging.
+[log.file]
+# Log file name.
+filename = ""
+
+# Max log file size in MB (upper limit to 4096MB).
+max-size = 300
+
+# Max log file keep days. No clean up by default.
+max-days = 0
+
+# Maximum number of old log files to retain. No clean up by default.
+max-backups = 0
+
+[security]
+# Path of file that contains list of trusted SSL CAs for connection with mysql client.
+ssl-ca = ""
+
+# Path of file that contains X509 certificate in PEM format for connection with mysql client.
+ssl-cert = ""
+
+# Path of file that contains X509 key in PEM format for connection with mysql client.
+ssl-key = ""
+
+# Path of file that contains list of trusted SSL CAs for connection with cluster components.
+cluster-ssl-ca = ""
+
+# Path of file that contains X509 certificate in PEM format for connection with cluster components.
+cluster-ssl-cert = ""
+
+# Path of file that contains X509 key in PEM format for connection with cluster components.
+cluster-ssl-key = ""
+
+[status]
+# If enable status report HTTP service.
+report-status = true
+
+# TiDB status port.
+status-port = 10080
+
+# Prometheus client push interval in second, set \"0\" to disable prometheus push.
+metrics-interval = 15
+
+[performance]
+# Max CPUs to use, 0 use number of CPUs in the machine.
+max-procs = 0
+# StmtCountLimit limits the max count of statement inside a transaction.
+stmt-count-limit = 5000
+
+# Set keep alive option for tcp connection.
+tcp-keep-alive = true
+
+# Whether support cartesian product.
+cross-join = true
+
+# Stats lease duration, which influences the time of analyze and stats load.
+stats-lease = "3s"
+
+# Run auto analyze worker on this tidb-server.
+run-auto-analyze = true
+
+# Probability to use the query feedback to update stats, 0 or 1 for always false/true.
+feedback-probability = 0.0
+
+# The max number of query feedback that cache in memory.
+query-feedback-limit = 1024
+
+# Pseudo stats will be used if the ratio between the modify count and
+# row count in statistics of a table is greater than it.
+pseudo-estimate-ratio = 0.7
+
+[proxy-protocol]
+# PROXY protocol acceptable client networks.
+# Empty string means disable PROXY protocol, * means all networks.
+networks = ""
+
+# PROXY protocol header read timeout, unit is second
+header-timeout = 5
+
+[opentracing]
+# Enable opentracing.
+enable = false
+
+# Whether to enable the rpc metrics.
+rpc-metrics = false
+
+[opentracing.sampler]
+# Type specifies the type of the sampler: const, probabilistic, rateLimiting, or remote
+type = "const"
+
+# Param is a value passed to the sampler.
+# Valid values for Param field are:
+# - for "const" sampler, 0 or 1 for always false/true respectively
+# - for "probabilistic" sampler, a probability between 0 and 1
+# - for "rateLimiting" sampler, the number of spans per second
+# - for "remote" sampler, param is the same as for "probabilistic"
+# and indicates the initial sampling rate before the actual one
+# is received from the mothership
+param = 1.0
+
+# SamplingServerURL is the address of jaeger-agent's HTTP sampling server
+sampling-server-url = ""
+
+# MaxOperations is the maximum number of operations that the sampler
+# will keep track of. If an operation is not tracked, a default probabilistic
+# sampler will be used rather than the per operation specific sampler.
+max-operations = 0
+
+# SamplingRefreshInterval controls how often the remotely controlled sampler will poll
+# jaeger-agent for the appropriate sampling strategy.
+sampling-refresh-interval = 0
+
+[opentracing.reporter]
+# QueueSize controls how many spans the reporter can keep in memory before it starts dropping
+# new spans. The queue is continuously drained by a background go-routine, as fast as spans
+# can be sent out of process.
+queue-size = 0
+
+# BufferFlushInterval controls how often the buffer is force-flushed, even if it's not full.
+# It is generally not useful, as it only matters for very low traffic services.
+buffer-flush-interval = 0
+
+# LogSpans, when true, enables LoggingReporter that runs in parallel with the main reporter
+# and logs all submitted spans. Main Configuration.Logger must be initialized in the code
+# for this option to have any effect.
+log-spans = false
+
+# LocalAgentHostPort instructs reporter to send spans to jaeger-agent at this address
+local-agent-host-port = ""
+
+[tikv-client]
+# Max gRPC connections that will be established with each tikv-server.
+grpc-connection-count = 16
+
+# After a duration of this time in seconds if the client doesn't see any activity it pings
+# the server to see if the transport is still alive.
+grpc-keepalive-time = 10
+
+# After having pinged for keepalive check, the client waits for a duration of Timeout in seconds
+# and if no activity is seen even after that the connection is closed.
+grpc-keepalive-timeout = 3
+
+# max time for commit command, must be twice bigger than raft election timeout.
+commit-timeout = "41s"
+
+[binlog]
+
+# Socket file to write binlog.
+binlog-socket = ""
+
+# WriteTimeout specifies how long it will wait for writing binlog to pump.
+write-timeout = "15s"
+
+# If IgnoreError is true, when writting binlog meets error, TiDB would stop writting binlog,
+# but still provide service.
+ignore-error = false
diff --git a/flink-cdc-e2e-tests/src/test/resources/docker/tidb/tikv.toml b/flink-cdc-e2e-tests/src/test/resources/docker/tidb/tikv.toml
new file mode 100644
index 000000000..5e7cb57a9
--- /dev/null
+++ b/flink-cdc-e2e-tests/src/test/resources/docker/tidb/tikv.toml
@@ -0,0 +1,513 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# TiKV config template
+# Human-readable big numbers:
+# File size(based on byte): KB, MB, GB, TB, PB
+# e.g.: 1_048_576 = "1MB"
+# Time(based on ms): ms, s, m, h
+# e.g.: 78_000 = "1.3m"
+
+# log level: trace, debug, info, warn, error, off.
+log-level = "error"
+# file to store log, write to stderr if it's empty.
+# log-file = ""
+log-rotation-size="500MB"
+
+[readpool.storage]
+# size of thread pool for high-priority operations
+# high-concurrency = 4
+# size of thread pool for normal-priority operations
+# normal-concurrency = 4
+# size of thread pool for low-priority operations
+# low-concurrency = 4
+# max running high-priority operations, reject if exceed
+# max-tasks-high = 8000
+# max running normal-priority operations, reject if exceed
+# max-tasks-normal = 8000
+# max running low-priority operations, reject if exceed
+# max-tasks-low = 8000
+# size of stack size for each thread pool
+# stack-size = "10MB"
+
+[readpool.coprocessor]
+# Notice: if CPU_NUM > 8, default thread pool size for coprocessors
+# will be set to CPU_NUM * 0.8.
+
+# high-concurrency = 8
+# normal-concurrency = 8
+# low-concurrency = 8
+# max-tasks-high = 16000
+# max-tasks-normal = 16000
+# max-tasks-low = 16000
+# stack-size = "10MB"
+
+[server]
+# set listening address.
+# addr = "127.0.0.1:20160"
+# set advertise listening address for client communication, if not set, use addr instead.
+# advertise-addr = ""
+# notify capacity, 40960 is suitable for about 7000 regions.
+# notify-capacity = 40960
+# maximum number of messages can be processed in one tick.
+# messages-per-tick = 4096
+
+# compression type for grpc channel, available values are no, deflate and gzip.
+# grpc-compression-type = "no"
+# size of thread pool for grpc server.
+# grpc-concurrency = 4
+# The number of max concurrent streams/requests on a client connection.
+# grpc-concurrent-stream = 1024
+# The number of connections with each tikv server to send raft messages.
+# grpc-raft-conn-num = 10
+# Amount to read ahead on individual grpc streams.
+# grpc-stream-initial-window-size = "2MB"
+
+# How many snapshots can be sent concurrently.
+# concurrent-send-snap-limit = 32
+# How many snapshots can be recv concurrently.
+# concurrent-recv-snap-limit = 32
+
+# max count of tasks being handled, new tasks will be rejected.
+# end-point-max-tasks = 2000
+
+# max recursion level allowed when decoding dag expression
+# end-point-recursion-limit = 1000
+
+# max time to handle coprocessor request before timeout
+# end-point-request-max-handle-duration = "60s"
+
+# the max bytes that snapshot can be written to disk in one second,
+# should be set based on your disk performance
+# snap-max-write-bytes-per-sec = "100MB"
+
+# set attributes about this server, e.g. { zone = "us-west-1", disk = "ssd" }.
+# labels = {}
+
+[storage]
+# set the path to rocksdb directory.
+# data-dir = "/tmp/tikv/store"
+
+# notify capacity of scheduler's channel
+# scheduler-notify-capacity = 10240
+
+# maximum number of messages can be processed in one tick
+# scheduler-messages-per-tick = 1024
+
+# the number of slots in scheduler latches, concurrency control for write.
+# scheduler-concurrency = 2048000
+
+# scheduler's worker pool size, should increase it in heavy write cases,
+# also should less than total cpu cores.
+# scheduler-worker-pool-size = 4
+
+# When the pending write bytes exceeds this threshold,
+# the "scheduler too busy" error is displayed.
+# scheduler-pending-write-threshold = "100MB"
+
+[pd]
+# pd endpoints
+# endpoints = []
+
+[metric]
+# the Prometheus client push interval. Setting the value to 0s stops Prometheus client from pushing.
+# interval = "15s"
+# the Prometheus pushgateway address. Leaving it empty stops Prometheus client from pushing.
+address = "pushgateway:9091"
+# the Prometheus client push job name. Note: A node id will automatically append, e.g., "tikv_1".
+# job = "tikv"
+
+[raftstore]
+# true (default value) for high reliability, this can prevent data loss when power failure.
+# sync-log = true
+
+# set the path to raftdb directory, default value is data-dir/raft
+# raftdb-path = ""
+
+# set store capacity, if no set, use disk capacity.
+# capacity = 0
+
+# notify capacity, 40960 is suitable for about 7000 regions.
+# notify-capacity = 40960
+
+# maximum number of messages can be processed in one tick.
+# messages-per-tick = 4096
+
+# Region heartbeat tick interval for reporting to pd.
+# pd-heartbeat-tick-interval = "60s"
+# Store heartbeat tick interval for reporting to pd.
+# pd-store-heartbeat-tick-interval = "10s"
+
+# When region size changes exceeds region-split-check-diff, we should check
+# whether the region should be split or not.
+# region-split-check-diff = "6MB"
+
+# Interval to check region whether need to be split or not.
+# split-region-check-tick-interval = "10s"
+
+# When raft entry exceed the max size, reject to propose the entry.
+# raft-entry-max-size = "8MB"
+
+# Interval to gc unnecessary raft log.
+# raft-log-gc-tick-interval = "10s"
+# A threshold to gc stale raft log, must >= 1.
+# raft-log-gc-threshold = 50
+# When entry count exceed this value, gc will be forced trigger.
+# raft-log-gc-count-limit = 72000
+# When the approximate size of raft log entries exceed this value, gc will be forced trigger.
+# It's recommanded to set it to 3/4 of region-split-size.
+# raft-log-gc-size-limit = "72MB"
+
+# When a peer hasn't been active for max-peer-down-duration,
+# we will consider this peer to be down and report it to pd.
+# max-peer-down-duration = "5m"
+
+# Interval to check whether start manual compaction for a region,
+# region-compact-check-interval = "5m"
+# Number of regions for each time to check.
+# region-compact-check-step = 100
+# The minimum number of delete tombstones to trigger manual compaction.
+# region-compact-min-tombstones = 10000
+# Interval to check whether should start a manual compaction for lock column family,
+# if written bytes reach lock-cf-compact-threshold for lock column family, will fire
+# a manual compaction for lock column family.
+# lock-cf-compact-interval = "10m"
+# lock-cf-compact-bytes-threshold = "256MB"
+
+# Interval (s) to check region whether the data are consistent.
+# consistency-check-interval = 0
+
+# Use delete range to drop a large number of continuous keys.
+# use-delete-range = false
+
+# delay time before deleting a stale peer
+# clean-stale-peer-delay = "10m"
+
+# Interval to cleanup import sst files.
+# cleanup-import-sst-interval = "10m"
+
+[coprocessor]
+# When it is true, it will try to split a region with table prefix if
+# that region crosses tables. It is recommended to turn off this option
+# if there will be a large number of tables created.
+# split-region-on-table = true
+# When the region's size exceeds region-max-size, we will split the region
+# into two which the left region's size will be region-split-size or a little
+# bit smaller.
+# region-max-size = "144MB"
+# region-split-size = "96MB"
+
+[rocksdb]
+# Maximum number of concurrent background jobs (compactions and flushes)
+# max-background-jobs = 8
+
+# This value represents the maximum number of threads that will concurrently perform a
+# compaction job by breaking it into multiple, smaller ones that are run simultaneously.
+# Default: 1 (i.e. no subcompactions)
+# max-sub-compactions = 1
+
+# Number of open files that can be used by the DB. You may need to
+# increase this if your database has a large working set. Value -1 means
+# files opened are always kept open. You can estimate number of files based
+# on target_file_size_base and target_file_size_multiplier for level-based
+# compaction.
+# If max-open-files = -1, RocksDB will prefetch index and filter blocks into
+# block cache at startup, so if your database has a large working set, it will
+# take several minutes to open the db.
+max-open-files = 1024
+
+# Max size of rocksdb's MANIFEST file.
+# For detailed explanation please refer to https://github.com/facebook/rocksdb/wiki/MANIFEST
+# max-manifest-file-size = "20MB"
+
+# If true, the database will be created if it is missing.
+# create-if-missing = true
+
+# rocksdb wal recovery mode
+# 0 : TolerateCorruptedTailRecords, tolerate incomplete record in trailing data on all logs;
+# 1 : AbsoluteConsistency, We don't expect to find any corruption in the WAL;
+# 2 : PointInTimeRecovery, Recover to point-in-time consistency;
+# 3 : SkipAnyCorruptedRecords, Recovery after a disaster;
+# wal-recovery-mode = 2
+
+# rocksdb write-ahead logs dir path
+# This specifies the absolute dir path for write-ahead logs (WAL).
+# If it is empty, the log files will be in the same dir as data.
+# When you set the path to rocksdb directory in memory like in /dev/shm, you may want to set
+# wal-dir to a directory on a persistent storage.
+# See https://github.com/facebook/rocksdb/wiki/How-to-persist-in-memory-RocksDB-database
+# wal-dir = "/tmp/tikv/store"
+
+# The following two fields affect how archived write-ahead logs will be deleted.
+# 1. If both set to 0, logs will be deleted asap and will not get into the archive.
+# 2. If wal-ttl-seconds is 0 and wal-size-limit is not 0,
+# WAL files will be checked every 10 min and if total size is greater
+# then wal-size-limit, they will be deleted starting with the
+# earliest until size_limit is met. All empty files will be deleted.
+# 3. If wal-ttl-seconds is not 0 and wal-size-limit is 0, then
+# WAL files will be checked every wal-ttl-seconds / 2 and those that
+# are older than wal-ttl-seconds will be deleted.
+# 4. If both are not 0, WAL files will be checked every 10 min and both
+# checks will be performed with ttl being first.
+# When you set the path to rocksdb directory in memory like in /dev/shm, you may want to set
+# wal-ttl-seconds to a value greater than 0 (like 86400) and backup your db on a regular basis.
+# See https://github.com/facebook/rocksdb/wiki/How-to-persist-in-memory-RocksDB-database
+# wal-ttl-seconds = 0
+# wal-size-limit = 0
+
+# rocksdb max total wal size
+# max-total-wal-size = "4GB"
+
+# Rocksdb Statistics provides cumulative stats over time.
+# Turn statistics on will introduce about 5%-10% overhead for RocksDB,
+# but it is worthy to know the internal status of RocksDB.
+# enable-statistics = true
+
+# Dump statistics periodically in information logs.
+# Same as rocksdb's default value (10 min).
+# stats-dump-period = "10m"
+
+# Due to Rocksdb FAQ: https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ,
+# If you want to use rocksdb on multi disks or spinning disks, you should set value at
+# least 2MB;
+# compaction-readahead-size = 0
+
+# This is the maximum buffer size that is used by WritableFileWrite
+# writable-file-max-buffer-size = "1MB"
+
+# Use O_DIRECT for both reads and writes in background flush and compactions
+# use-direct-io-for-flush-and-compaction = false
+
+# Limit the disk IO of compaction and flush. Compaction and flush can cause
+# terrible spikes if they exceed a certain threshold. Consider setting this to
+# 50% ~ 80% of the disk throughput for a more stable result. However, in heavy
+# write workload, limiting compaction and flush speed can cause write stalls too.
+# rate-bytes-per-sec = 0
+
+# Enable or disable the pipelined write
+# enable-pipelined-write = true
+
+# Allows OS to incrementally sync files to disk while they are being
+# written, asynchronously, in the background.
+# bytes-per-sync = "0MB"
+
+# Allows OS to incrementally sync WAL to disk while it is being written.
+# wal-bytes-per-sync = "0KB"
+
+# Specify the maximal size of the Rocksdb info log file. If the log file
+# is larger than `max_log_file_size`, a new info log file will be created.
+# If max_log_file_size == 0, all logs will be written to one log file.
+# Default: 1GB
+# info-log-max-size = "1GB"
+
+# Time for the Rocksdb info log file to roll (in seconds).
+# If specified with non-zero value, log file will be rolled
+# if it has been active longer than `log_file_time_to_roll`.
+# Default: 0 (disabled)
+# info-log-roll-time = "0"
+
+# Maximal Rocksdb info log files to be kept.
+# Default: 10
+# info-log-keep-log-file-num = 10
+
+# This specifies the Rocksdb info LOG dir.
+# If it is empty, the log files will be in the same dir as data.
+# If it is non empty, the log files will be in the specified dir,
+# and the db data dir's absolute path will be used as the log file
+# name's prefix.
+# Default: empty
+# info-log-dir = ""
+
+# Column Family default used to store actual data of the database.
+[rocksdb.defaultcf]
+# compression method (if any) is used to compress a block.
+# no: kNoCompression
+# snappy: kSnappyCompression
+# zlib: kZlibCompression
+# bzip2: kBZip2Compression
+# lz4: kLZ4Compression
+# lz4hc: kLZ4HCCompression
+# zstd: kZSTD
+
+# per level compression
+# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]
+
+# Approximate size of user data packed per block. Note that the
+# block size specified here corresponds to uncompressed data.
+# block-size = "64KB"
+
+# If you're doing point lookups you definitely want to turn bloom filters on, We use
+# bloom filters to avoid unnecessary disk reads. Default bits_per_key is 10, which
+# yields ~1% false positive rate. Larger bits_per_key values will reduce false positive
+# rate, but increase memory usage and space amplification.
+# bloom-filter-bits-per-key = 10
+
+# false means one sst file one bloom filter, true means evry block has a corresponding bloom filter
+# block-based-bloom-filter = false
+
+# level0-file-num-compaction-trigger = 4
+
+# Soft limit on number of level-0 files. We start slowing down writes at this point.
+# level0-slowdown-writes-trigger = 20
+
+# Maximum number of level-0 files. We stop writes at this point.
+# level0-stop-writes-trigger = 36
+
+# Amount of data to build up in memory (backed by an unsorted log
+# on disk) before converting to a sorted on-disk file.
+# write-buffer-size = "128MB"
+
+# The maximum number of write buffers that are built up in memory.
+# max-write-buffer-number = 5
+
+# The minimum number of write buffers that will be merged together
+# before writing to storage.
+# min-write-buffer-number-to-merge = 1
+
+# Control maximum total data size for base level (level 1).
+# max-bytes-for-level-base = "512MB"
+
+# Target file size for compaction.
+# target-file-size-base = "8MB"
+
+# Max bytes for compaction.max_compaction_bytes
+# max-compaction-bytes = "2GB"
+
+# There are four different algorithms to pick files to compact.
+# 0 : ByCompensatedSize
+# 1 : OldestLargestSeqFirst
+# 2 : OldestSmallestSeqFirst
+# 3 : MinOverlappingRatio
+# compaction-pri = 3
+
+# block-cache used to cache uncompressed blocks, big block-cache can speed up read.
+# in normal cases should tune to 30%-50% system's total memory.
+# block-cache-size = "1GB"
+
+# Indicating if we'd put index/filter blocks to the block cache.
+# If not specified, each "table reader" object will pre-load index/filter block
+# during table initialization.
+# cache-index-and-filter-blocks = true
+
+# Pin level0 filter and index blocks in cache.
+# pin-l0-filter-and-index-blocks = true
+
+# Enable read amplication statistics.
+# value => memory usage (percentage of loaded blocks memory)
+# 1 => 12.50 %
+# 2 => 06.25 %
+# 4 => 03.12 %
+# 8 => 01.56 %
+# 16 => 00.78 %
+# read-amp-bytes-per-bit = 0
+
+# Pick target size of each level dynamically.
+# dynamic-level-bytes = true
+
+# Options for Column Family write
+# Column Family write used to store commit informations in MVCC model
+[rocksdb.writecf]
+# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]
+# block-size = "64KB"
+# write-buffer-size = "128MB"
+# max-write-buffer-number = 5
+# min-write-buffer-number-to-merge = 1
+# max-bytes-for-level-base = "512MB"
+# target-file-size-base = "8MB"
+
+# in normal cases should tune to 10%-30% system's total memory.
+# block-cache-size = "256MB"
+# level0-file-num-compaction-trigger = 4
+# level0-slowdown-writes-trigger = 20
+# level0-stop-writes-trigger = 36
+# cache-index-and-filter-blocks = true
+# pin-l0-filter-and-index-blocks = true
+# compaction-pri = 3
+# read-amp-bytes-per-bit = 0
+# dynamic-level-bytes = true
+
+[rocksdb.lockcf]
+# compression-per-level = ["no", "no", "no", "no", "no", "no", "no"]
+# block-size = "16KB"
+# write-buffer-size = "128MB"
+# max-write-buffer-number = 5
+# min-write-buffer-number-to-merge = 1
+# max-bytes-for-level-base = "128MB"
+# target-file-size-base = "8MB"
+# block-cache-size = "256MB"
+# level0-file-num-compaction-trigger = 1
+# level0-slowdown-writes-trigger = 20
+# level0-stop-writes-trigger = 36
+# cache-index-and-filter-blocks = true
+# pin-l0-filter-and-index-blocks = true
+# compaction-pri = 0
+# read-amp-bytes-per-bit = 0
+# dynamic-level-bytes = true
+
+[raftdb]
+# max-sub-compactions = 1
+max-open-files = 1024
+# max-manifest-file-size = "20MB"
+# create-if-missing = true
+
+# enable-statistics = true
+# stats-dump-period = "10m"
+
+# compaction-readahead-size = 0
+# writable-file-max-buffer-size = "1MB"
+# use-direct-io-for-flush-and-compaction = false
+# enable-pipelined-write = true
+# allow-concurrent-memtable-write = false
+# bytes-per-sync = "0MB"
+# wal-bytes-per-sync = "0KB"
+
+# info-log-max-size = "1GB"
+# info-log-roll-time = "0"
+# info-log-keep-log-file-num = 10
+# info-log-dir = ""
+
+[raftdb.defaultcf]
+# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]
+# block-size = "64KB"
+# write-buffer-size = "128MB"
+# max-write-buffer-number = 5
+# min-write-buffer-number-to-merge = 1
+# max-bytes-for-level-base = "512MB"
+# target-file-size-base = "8MB"
+
+# should tune to 256MB~2GB.
+# block-cache-size = "256MB"
+# level0-file-num-compaction-trigger = 4
+# level0-slowdown-writes-trigger = 20
+# level0-stop-writes-trigger = 36
+# cache-index-and-filter-blocks = true
+# pin-l0-filter-and-index-blocks = true
+# compaction-pri = 0
+# read-amp-bytes-per-bit = 0
+# dynamic-level-bytes = true
+
+[security]
+# set the path for certificates. Empty string means disabling secure connectoins.
+# ca-path = ""
+# cert-path = ""
+# key-path = ""
+
+[import]
+# the directory to store importing kv data.
+# import-dir = "/tmp/tikv/import"
+# number of threads to handle RPC requests.
+# num-threads = 8
+# stream channel window size, stream will be blocked on channel full.
+# stream-channel-window = 128