[tidb] Fix tikv.pd.addresses not work and use ResolvedSchema instead of TableSchema.

pull/898/head
gongzhongqiang committed by Leonard Xu
parent a06789e346
commit e718445aa5

@ -162,9 +162,16 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<groupId>org.testcontainers</groupId>
<artifactId>mssqlserver</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${version.awaitility}</version>
<scope>test</scope>
</dependency>

@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@ -46,12 +46,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class TiDBTableSource implements ScanTableSource {
private final TableSchema physicalSchema;
private final ResolvedSchema physicalSchema;
private final String hostname;
private final String database;
private final String tableName;
private final String username;
private final String password;
private final String pdAddresses;
private final StartupOptions startupOptions;
private final Map<String, String> options;
@ -63,12 +64,13 @@ public class TiDBTableSource implements ScanTableSource {
protected DataType producedDataType;
public TiDBTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
String hostname,
String database,
String tableName,
String username,
String password,
String pdAddresses,
StartupOptions startupOptions,
Map<String, String> options) {
this.physicalSchema = physicalSchema;
@ -77,6 +79,7 @@ public class TiDBTableSource implements ScanTableSource {
this.tableName = checkNotNull(tableName);
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.pdAddresses = checkNotNull(pdAddresses);
this.startupOptions = startupOptions;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.options = options;
@ -99,8 +102,7 @@ public class TiDBTableSource implements ScanTableSource {
try (final TiSession session = TiSession.create(tiConf)) {
final TiTableInfo tableInfo = session.getCatalog().getTable(database, tableName);
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
RowDataTiKVSnapshotEventDeserializationSchema snapshotEventDeserializationSchema =
new RowDataTiKVSnapshotEventDeserializationSchema(typeInfo, tableInfo);
RowDataTiKVChangeEventDeserializationSchema changeEventDeserializationSchema =
@ -134,6 +136,7 @@ public class TiDBTableSource implements ScanTableSource {
tableName,
username,
password,
pdAddresses,
startupOptions,
options);
source.producedDataType = producedDataType;

@ -20,14 +20,11 @@ package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import com.ververica.cdc.connectors.tidb.TDBSourceOptions;
import java.util.HashSet;
import java.util.Set;
@ -37,6 +34,13 @@ import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.SCAN_STARTUP_MODE;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TABLE_NAME;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_DELETE_CONCURRENCY;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_GET_CONCURRENCY;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_PUT_CONCURRENCY;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_TIMEOUT;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_PD_ADDRESSES;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.USERNAME;
/** Factory for creating configured instance of {@link TiDBTableSource}. */
@ -55,9 +59,9 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
String pdAddresses = config.get(TIKV_PD_ADDRESSES);
StartupOptions startupOptions = getStartupOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
return new TiDBTableSource(
physicalSchema,
@ -66,6 +70,7 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
tableName,
username,
password,
pdAddresses,
startupOptions,
context.getCatalogTable().getOptions());
}
@ -83,7 +88,7 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(PASSWORD);
options.add(DATABASE_NAME);
options.add(TABLE_NAME);
options.add(TDBSourceOptions.TIKV_PD_ADDRESSES);
options.add(TIKV_PD_ADDRESSES);
return options;
}
@ -91,12 +96,12 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(SCAN_STARTUP_MODE);
options.add(TDBSourceOptions.TIKV_GRPC_TIMEOUT);
options.add(TDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT);
options.add(TDBSourceOptions.TIKV_BATCH_GET_CONCURRENCY);
options.add(TDBSourceOptions.TIKV_BATCH_PUT_CONCURRENCY);
options.add(TDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY);
options.add(TDBSourceOptions.TIKV_BATCH_DELETE_CONCURRENCY);
options.add(TIKV_GRPC_TIMEOUT);
options.add(TIKV_GRPC_SCAN_TIMEOUT);
options.add(TIKV_BATCH_GET_CONCURRENCY);
options.add(TIKV_BATCH_PUT_CONCURRENCY);
options.add(TIKV_BATCH_SCAN_CONCURRENCY);
options.add(TIKV_BATCH_DELETE_CONCURRENCY);
return options;
}

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tidb;
import org.apache.flink.test.util.AbstractTestBase;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.lifecycle.Startables;
import java.io.File;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertNotNull;
/** Utility class for tidb tests. */
public class TiDBTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TiDBTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
public static final String TIDB_USER = "root";
public static final String TIDB_PASSWORD = "";
public static final int TIDB_PORT = 4000;
public static final int PD_PORT = 2379;
public static final String TIDB_SERVICE_NAME = "tidb";
public static final String TIKV_SERVICE_NAME = "tikv";
public static final String PD_SERVICE_NAME = "pd";
public static final DockerComposeContainer TIDB_DOCKER_COMPOSE =
new DockerComposeContainer(new File("src/test/resources/docker/docker-compose.yml"))
.withExposedService(
TIDB_SERVICE_NAME + "_1",
TIDB_PORT,
Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(120)))
.withExposedService(
PD_SERVICE_NAME + "_1",
PD_PORT,
Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(120)))
.withLocalCompose(true);
@BeforeClass
public static void startContainers() throws Exception {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(TIDB_DOCKER_COMPOSE)).join();
LOG.info("Containers are started.");
}
@AfterClass
public static void stopContainers() {
if (TIDB_DOCKER_COMPOSE != null) {
TIDB_DOCKER_COMPOSE.stop();
}
}
public String getTIDBHost() {
return TIDB_DOCKER_COMPOSE.getServiceHost(TIDB_SERVICE_NAME, TIDB_PORT);
}
public Integer getTIDBPort() {
return TIDB_DOCKER_COMPOSE.getServicePort(TIDB_SERVICE_NAME, TIDB_PORT);
}
public String getPDHost() {
return TIDB_DOCKER_COMPOSE.getServiceHost(PD_SERVICE_NAME, PD_PORT);
}
public Integer getPDPort() {
return TIDB_DOCKER_COMPOSE.getServicePort(PD_SERVICE_NAME, PD_PORT);
}
public String getJdbcUrl(String databaseName) {
return "jdbc:mysql://" + getTIDBHost() + ":" + getTIDBPort() + "/" + databaseName;
}
protected Connection getJdbcConnection(String databaseName) throws SQLException {
return DriverManager.getConnection(getJdbcUrl(databaseName), TIDB_USER, TIDB_PASSWORD);
}
private static void dropTestDatabase(Connection connection, String databaseName)
throws SQLException {
try {
Awaitility.await(String.format("Dropping database %s", databaseName))
.atMost(120, TimeUnit.SECONDS)
.until(
() -> {
try {
String sql =
String.format(
"DROP DATABASE IF EXISTS %s", databaseName);
connection.createStatement().execute(sql);
return true;
} catch (SQLException e) {
LOG.warn(
String.format(
"DROP DATABASE %s failed: {}", databaseName),
e.getMessage());
return false;
}
});
} catch (ConditionTimeoutException e) {
throw new IllegalStateException("Failed to drop test database", e);
}
}
/**
* Executes a JDBC statement using the default jdbc config without autocommitting the
* connection.
*/
protected void initializeTidbTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile = TiDBTestBase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection("");
Statement statement = connection.createStatement()) {
dropTestDatabase(connection, sqlFile);
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

@ -18,26 +18,19 @@
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import com.ververica.cdc.connectors.tidb.TiDBTestBase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.Ignore;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
@ -48,11 +41,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Integration tests for TiDB change stream event SQL source. */
public class TiDBConnectorITCase extends TestLogger {
private static final String FLINK_USER = "flinkuser";
private static final String FLINK_USER_PASSWORD = "flinkpwd";
private static final int DEFAULT_PARALLELISM = 4;
public class TiDBConnectorITCase extends TiDBTestBase {
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
@ -63,26 +52,15 @@ public class TiDBConnectorITCase extends TestLogger {
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(4)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.setParallelism(1);
}
@Test
public void testConsumingAllEvents() throws Exception {
initializeTidbTable("inventory");
String sourceDDL =
String.format(
"CREATE TABLE tidb_source ("
@ -93,7 +71,7 @@ public class TiDBConnectorITCase extends TestLogger {
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'tidb-cdc',"
+ " 'hostname' = '127.0.0.1',"
+ " 'hostname' = '%s',"
+ " 'tikv.grpc.timeout_in_ms' = '20000',"
+ " 'tikv.pd.addresses' = '%s',"
+ " 'username' = '%s',"
@ -101,7 +79,12 @@ public class TiDBConnectorITCase extends TestLogger {
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
"127.0.0.1:2379", FLINK_USER, FLINK_USER_PASSWORD, "inventory", "products");
getTIDBHost(),
getPDHost() + ":" + getPDPort(),
TIDB_USER,
TIDB_PASSWORD,
"inventory",
"products");
String sinkDDL =
"CREATE TABLE sink ("
@ -123,9 +106,7 @@ public class TiDBConnectorITCase extends TestLogger {
// wait for snapshot finished and begin binlog
waitForSinkSize("sink", 9);
try (Connection connection =
DriverManager.getConnection(
"jdbc:mysql://root@localhost:4000/inventory", FLINK_USER, null);
try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
statement.execute(

@ -16,6 +16,9 @@
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE inventory;
USE inventory;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products

@ -0,0 +1,101 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# PD Configuration.
name = "pd"
data-dir = "default.pd"
client-urls = "http://127.0.0.1:2379"
# if not set, use ${client-urls}
advertise-client-urls = ""
peer-urls = "http://127.0.0.1:2380"
# if not set, use ${peer-urls}
advertise-peer-urls = ""
initial-cluster = "pd=http://127.0.0.1:2380"
initial-cluster-state = "new"
lease = 3
tso-save-interval = "3s"
[security]
# Path of file that contains list of trusted SSL CAs. if set, following four settings shouldn't be empty
cacert-path = ""
# Path of file that contains X509 certificate in PEM format.
cert-path = ""
# Path of file that contains X509 key in PEM format.
key-path = ""
[log]
level = "error"
# log format, one of json, text, console
#format = "text"
# disable automatic timestamps in output
#disable-timestamp = false
# file logging
[log.file]
#filename = ""
# max log file size in MB
#max-size = 300
# max log file keep days
#max-days = 28
# maximum number of old log files to retain
#max-backups = 7
# rotate log by day
#log-rotate = true
[metric]
# prometheus client push interval, set "0s" to disable prometheus.
interval = "15s"
# prometheus pushgateway address, leaves it empty will disable prometheus.
address = "pushgateway:9091"
[schedule]
max-merge-region-size = 0
split-merge-interval = "1h"
max-snapshot-count = 3
max-pending-peer-count = 16
max-store-down-time = "30m"
leader-schedule-limit = 4
region-schedule-limit = 4
replica-schedule-limit = 8
merge-schedule-limit = 8
tolerant-size-ratio = 5.0
# customized schedulers, the format is as below
# if empty, it will use balance-leader, balance-region, hot-region as default
# [[schedule.schedulers]]
# type = "evict-leader"
# args = ["1"]
[replication]
# The number of replicas for each region.
max-replicas = 3
# The label keys specified the location of a store.
# The placement priorities is implied by the order of label keys.
# For example, ["zone", "rack"] means that we should place replicas to
# different zones first, then to different racks if we don't have enough zones.
location-labels = []
[label-property]
# Do not assign region leaders to stores that have these tags.
# [[label-property.reject-leader]]
# key = "zone"
# value = "cn1

@ -0,0 +1,233 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TiDB Configuration.
# TiDB server host.
host = "0.0.0.0"
# TiDB server port.
port = 4000
# Registered store name, [tikv, mocktikv]
store = "tikv"
# TiDB storage path.
path = "/tmp/tidb"
# The socket file to use for connection.
socket = ""
# Run ddl worker on this tidb-server.
run-ddl = true
# Schema lease duration, very dangerous to change only if you know what you do.
lease = "0"
# When create table, split a separated region for it. It is recommended to
# turn off this option if there will be a large number of tables created.
split-table = true
# The limit of concurrent executed sessions.
token-limit = 1000
# Only print a log when out of memory quota.
# Valid options: ["log", "cancel"]
oom-action = "log"
# Set the memory quota for a query in bytes. Default: 32GB
mem-quota-query = 34359738368
# Set system variable 'lower_case_table_names'
lower-case-table-names = 2
[log]
# Log level: debug, info, warn, error, fatal.
level = "error"
# Log format, one of json, text, console.
format = "text"
# Disable automatic timestamp in output
disable-timestamp = false
# Stores slow query log into separated files.
slow-query-file = ""
# Queries with execution time greater than this value will be logged. (Milliseconds)
slow-threshold = 300
# Queries with internal result greater than this value will be logged.
expensive-threshold = 10000
# Maximum query length recorded in log.
query-log-max-len = 2048
# File logging.
[log.file]
# Log file name.
filename = ""
# Max log file size in MB (upper limit to 4096MB).
max-size = 300
# Max log file keep days. No clean up by default.
max-days = 0
# Maximum number of old log files to retain. No clean up by default.
max-backups = 0
[security]
# Path of file that contains list of trusted SSL CAs for connection with mysql client.
ssl-ca = ""
# Path of file that contains X509 certificate in PEM format for connection with mysql client.
ssl-cert = ""
# Path of file that contains X509 key in PEM format for connection with mysql client.
ssl-key = ""
# Path of file that contains list of trusted SSL CAs for connection with cluster components.
cluster-ssl-ca = ""
# Path of file that contains X509 certificate in PEM format for connection with cluster components.
cluster-ssl-cert = ""
# Path of file that contains X509 key in PEM format for connection with cluster components.
cluster-ssl-key = ""
[status]
# If enable status report HTTP service.
report-status = true
# TiDB status port.
status-port = 10080
# Prometheus client push interval in second, set \"0\" to disable prometheus push.
metrics-interval = 15
[performance]
# Max CPUs to use, 0 use number of CPUs in the machine.
max-procs = 0
# StmtCountLimit limits the max count of statement inside a transaction.
stmt-count-limit = 5000
# Set keep alive option for tcp connection.
tcp-keep-alive = true
# Whether support cartesian product.
cross-join = true
# Stats lease duration, which influences the time of analyze and stats load.
stats-lease = "3s"
# Run auto analyze worker on this tidb-server.
run-auto-analyze = true
# Probability to use the query feedback to update stats, 0 or 1 for always false/true.
feedback-probability = 0.0
# The max number of query feedback that cache in memory.
query-feedback-limit = 1024
# Pseudo stats will be used if the ratio between the modify count and
# row count in statistics of a table is greater than it.
pseudo-estimate-ratio = 0.7
[proxy-protocol]
# PROXY protocol acceptable client networks.
# Empty string means disable PROXY protocol, * means all networks.
networks = ""
# PROXY protocol header read timeout, unit is second
header-timeout = 5
[opentracing]
# Enable opentracing.
enable = false
# Whether to enable the rpc metrics.
rpc-metrics = false
[opentracing.sampler]
# Type specifies the type of the sampler: const, probabilistic, rateLimiting, or remote
type = "const"
# Param is a value passed to the sampler.
# Valid values for Param field are:
# - for "const" sampler, 0 or 1 for always false/true respectively
# - for "probabilistic" sampler, a probability between 0 and 1
# - for "rateLimiting" sampler, the number of spans per second
# - for "remote" sampler, param is the same as for "probabilistic"
# and indicates the initial sampling rate before the actual one
# is received from the mothership
param = 1.0
# SamplingServerURL is the address of jaeger-agent's HTTP sampling server
sampling-server-url = ""
# MaxOperations is the maximum number of operations that the sampler
# will keep track of. If an operation is not tracked, a default probabilistic
# sampler will be used rather than the per operation specific sampler.
max-operations = 0
# SamplingRefreshInterval controls how often the remotely controlled sampler will poll
# jaeger-agent for the appropriate sampling strategy.
sampling-refresh-interval = 0
[opentracing.reporter]
# QueueSize controls how many spans the reporter can keep in memory before it starts dropping
# new spans. The queue is continuously drained by a background go-routine, as fast as spans
# can be sent out of process.
queue-size = 0
# BufferFlushInterval controls how often the buffer is force-flushed, even if it's not full.
# It is generally not useful, as it only matters for very low traffic services.
buffer-flush-interval = 0
# LogSpans, when true, enables LoggingReporter that runs in parallel with the main reporter
# and logs all submitted spans. Main Configuration.Logger must be initialized in the code
# for this option to have any effect.
log-spans = false
# LocalAgentHostPort instructs reporter to send spans to jaeger-agent at this address
local-agent-host-port = ""
[tikv-client]
# Max gRPC connections that will be established with each tikv-server.
grpc-connection-count = 16
# After a duration of this time in seconds if the client doesn't see any activity it pings
# the server to see if the transport is still alive.
grpc-keepalive-time = 10
# After having pinged for keepalive check, the client waits for a duration of Timeout in seconds
# and if no activity is seen even after that the connection is closed.
grpc-keepalive-timeout = 3
# max time for commit command, must be twice bigger than raft election timeout.
commit-timeout = "41s"
[binlog]
# Socket file to write binlog.
binlog-socket = ""
# WriteTimeout specifies how long it will wait for writing binlog to pump.
write-timeout = "15s"
# If IgnoreError is true, when writting binlog meets error, TiDB would stop writting binlog,
# but still provide service.
ignore-error = false

@ -0,0 +1,513 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TiKV config template
# Human-readable big numbers:
# File size(based on byte): KB, MB, GB, TB, PB
# e.g.: 1_048_576 = "1MB"
# Time(based on ms): ms, s, m, h
# e.g.: 78_000 = "1.3m"
# log level: trace, debug, info, warn, error, off.
log-level = "error"
# file to store log, write to stderr if it's empty.
# log-file = ""
log-rotation-size="500MB"
[readpool.storage]
# size of thread pool for high-priority operations
# high-concurrency = 4
# size of thread pool for normal-priority operations
# normal-concurrency = 4
# size of thread pool for low-priority operations
# low-concurrency = 4
# max running high-priority operations, reject if exceed
# max-tasks-high = 8000
# max running normal-priority operations, reject if exceed
# max-tasks-normal = 8000
# max running low-priority operations, reject if exceed
# max-tasks-low = 8000
# size of stack size for each thread pool
# stack-size = "10MB"
[readpool.coprocessor]
# Notice: if CPU_NUM > 8, default thread pool size for coprocessors
# will be set to CPU_NUM * 0.8.
# high-concurrency = 8
# normal-concurrency = 8
# low-concurrency = 8
# max-tasks-high = 16000
# max-tasks-normal = 16000
# max-tasks-low = 16000
# stack-size = "10MB"
[server]
# set listening address.
# addr = "127.0.0.1:20160"
# set advertise listening address for client communication, if not set, use addr instead.
# advertise-addr = ""
# notify capacity, 40960 is suitable for about 7000 regions.
# notify-capacity = 40960
# maximum number of messages can be processed in one tick.
# messages-per-tick = 4096
# compression type for grpc channel, available values are no, deflate and gzip.
# grpc-compression-type = "no"
# size of thread pool for grpc server.
# grpc-concurrency = 4
# The number of max concurrent streams/requests on a client connection.
# grpc-concurrent-stream = 1024
# The number of connections with each tikv server to send raft messages.
# grpc-raft-conn-num = 10
# Amount to read ahead on individual grpc streams.
# grpc-stream-initial-window-size = "2MB"
# How many snapshots can be sent concurrently.
# concurrent-send-snap-limit = 32
# How many snapshots can be recv concurrently.
# concurrent-recv-snap-limit = 32
# max count of tasks being handled, new tasks will be rejected.
# end-point-max-tasks = 2000
# max recursion level allowed when decoding dag expression
# end-point-recursion-limit = 1000
# max time to handle coprocessor request before timeout
# end-point-request-max-handle-duration = "60s"
# the max bytes that snapshot can be written to disk in one second,
# should be set based on your disk performance
# snap-max-write-bytes-per-sec = "100MB"
# set attributes about this server, e.g. { zone = "us-west-1", disk = "ssd" }.
# labels = {}
[storage]
# set the path to rocksdb directory.
# data-dir = "/tmp/tikv/store"
# notify capacity of scheduler's channel
# scheduler-notify-capacity = 10240
# maximum number of messages can be processed in one tick
# scheduler-messages-per-tick = 1024
# the number of slots in scheduler latches, concurrency control for write.
# scheduler-concurrency = 2048000
# scheduler's worker pool size, should increase it in heavy write cases,
# also should less than total cpu cores.
# scheduler-worker-pool-size = 4
# When the pending write bytes exceeds this threshold,
# the "scheduler too busy" error is displayed.
# scheduler-pending-write-threshold = "100MB"
[pd]
# pd endpoints
# endpoints = []
[metric]
# the Prometheus client push interval. Setting the value to 0s stops Prometheus client from pushing.
# interval = "15s"
# the Prometheus pushgateway address. Leaving it empty stops Prometheus client from pushing.
address = "pushgateway:9091"
# the Prometheus client push job name. Note: A node id will automatically append, e.g., "tikv_1".
# job = "tikv"
[raftstore]
# true (default value) for high reliability, this can prevent data loss when power failure.
# sync-log = true
# set the path to raftdb directory, default value is data-dir/raft
# raftdb-path = ""
# set store capacity, if no set, use disk capacity.
# capacity = 0
# notify capacity, 40960 is suitable for about 7000 regions.
# notify-capacity = 40960
# maximum number of messages can be processed in one tick.
# messages-per-tick = 4096
# Region heartbeat tick interval for reporting to pd.
# pd-heartbeat-tick-interval = "60s"
# Store heartbeat tick interval for reporting to pd.
# pd-store-heartbeat-tick-interval = "10s"
# When region size changes exceeds region-split-check-diff, we should check
# whether the region should be split or not.
# region-split-check-diff = "6MB"
# Interval to check region whether need to be split or not.
# split-region-check-tick-interval = "10s"
# When raft entry exceed the max size, reject to propose the entry.
# raft-entry-max-size = "8MB"
# Interval to gc unnecessary raft log.
# raft-log-gc-tick-interval = "10s"
# A threshold to gc stale raft log, must >= 1.
# raft-log-gc-threshold = 50
# When entry count exceed this value, gc will be forced trigger.
# raft-log-gc-count-limit = 72000
# When the approximate size of raft log entries exceed this value, gc will be forced trigger.
# It's recommanded to set it to 3/4 of region-split-size.
# raft-log-gc-size-limit = "72MB"
# When a peer hasn't been active for max-peer-down-duration,
# we will consider this peer to be down and report it to pd.
# max-peer-down-duration = "5m"
# Interval to check whether start manual compaction for a region,
# region-compact-check-interval = "5m"
# Number of regions for each time to check.
# region-compact-check-step = 100
# The minimum number of delete tombstones to trigger manual compaction.
# region-compact-min-tombstones = 10000
# Interval to check whether should start a manual compaction for lock column family,
# if written bytes reach lock-cf-compact-threshold for lock column family, will fire
# a manual compaction for lock column family.
# lock-cf-compact-interval = "10m"
# lock-cf-compact-bytes-threshold = "256MB"
# Interval (s) to check region whether the data are consistent.
# consistency-check-interval = 0
# Use delete range to drop a large number of continuous keys.
# use-delete-range = false
# delay time before deleting a stale peer
# clean-stale-peer-delay = "10m"
# Interval to cleanup import sst files.
# cleanup-import-sst-interval = "10m"
[coprocessor]
# When it is true, it will try to split a region with table prefix if
# that region crosses tables. It is recommended to turn off this option
# if there will be a large number of tables created.
# split-region-on-table = true
# When the region's size exceeds region-max-size, we will split the region
# into two which the left region's size will be region-split-size or a little
# bit smaller.
# region-max-size = "144MB"
# region-split-size = "96MB"
[rocksdb]
# Maximum number of concurrent background jobs (compactions and flushes)
# max-background-jobs = 8
# This value represents the maximum number of threads that will concurrently perform a
# compaction job by breaking it into multiple, smaller ones that are run simultaneously.
# Default: 1 (i.e. no subcompactions)
# max-sub-compactions = 1
# Number of open files that can be used by the DB. You may need to
# increase this if your database has a large working set. Value -1 means
# files opened are always kept open. You can estimate number of files based
# on target_file_size_base and target_file_size_multiplier for level-based
# compaction.
# If max-open-files = -1, RocksDB will prefetch index and filter blocks into
# block cache at startup, so if your database has a large working set, it will
# take several minutes to open the db.
max-open-files = 1024
# Max size of rocksdb's MANIFEST file.
# For detailed explanation please refer to https://github.com/facebook/rocksdb/wiki/MANIFEST
# max-manifest-file-size = "20MB"
# If true, the database will be created if it is missing.
# create-if-missing = true
# rocksdb wal recovery mode
# 0 : TolerateCorruptedTailRecords, tolerate incomplete record in trailing data on all logs;
# 1 : AbsoluteConsistency, We don't expect to find any corruption in the WAL;
# 2 : PointInTimeRecovery, Recover to point-in-time consistency;
# 3 : SkipAnyCorruptedRecords, Recovery after a disaster;
# wal-recovery-mode = 2
# rocksdb write-ahead logs dir path
# This specifies the absolute dir path for write-ahead logs (WAL).
# If it is empty, the log files will be in the same dir as data.
# When you set the path to rocksdb directory in memory like in /dev/shm, you may want to set
# wal-dir to a directory on a persistent storage.
# See https://github.com/facebook/rocksdb/wiki/How-to-persist-in-memory-RocksDB-database
# wal-dir = "/tmp/tikv/store"
# The following two fields affect how archived write-ahead logs will be deleted.
# 1. If both set to 0, logs will be deleted asap and will not get into the archive.
# 2. If wal-ttl-seconds is 0 and wal-size-limit is not 0,
# WAL files will be checked every 10 min and if total size is greater
# then wal-size-limit, they will be deleted starting with the
# earliest until size_limit is met. All empty files will be deleted.
# 3. If wal-ttl-seconds is not 0 and wal-size-limit is 0, then
# WAL files will be checked every wal-ttl-seconds / 2 and those that
# are older than wal-ttl-seconds will be deleted.
# 4. If both are not 0, WAL files will be checked every 10 min and both
# checks will be performed with ttl being first.
# When you set the path to rocksdb directory in memory like in /dev/shm, you may want to set
# wal-ttl-seconds to a value greater than 0 (like 86400) and backup your db on a regular basis.
# See https://github.com/facebook/rocksdb/wiki/How-to-persist-in-memory-RocksDB-database
# wal-ttl-seconds = 0
# wal-size-limit = 0
# rocksdb max total wal size
# max-total-wal-size = "4GB"
# Rocksdb Statistics provides cumulative stats over time.
# Turn statistics on will introduce about 5%-10% overhead for RocksDB,
# but it is worthy to know the internal status of RocksDB.
# enable-statistics = true
# Dump statistics periodically in information logs.
# Same as rocksdb's default value (10 min).
# stats-dump-period = "10m"
# Due to Rocksdb FAQ: https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ,
# If you want to use rocksdb on multi disks or spinning disks, you should set value at
# least 2MB;
# compaction-readahead-size = 0
# This is the maximum buffer size that is used by WritableFileWrite
# writable-file-max-buffer-size = "1MB"
# Use O_DIRECT for both reads and writes in background flush and compactions
# use-direct-io-for-flush-and-compaction = false
# Limit the disk IO of compaction and flush. Compaction and flush can cause
# terrible spikes if they exceed a certain threshold. Consider setting this to
# 50% ~ 80% of the disk throughput for a more stable result. However, in heavy
# write workload, limiting compaction and flush speed can cause write stalls too.
# rate-bytes-per-sec = 0
# Enable or disable the pipelined write
# enable-pipelined-write = true
# Allows OS to incrementally sync files to disk while they are being
# written, asynchronously, in the background.
# bytes-per-sync = "0MB"
# Allows OS to incrementally sync WAL to disk while it is being written.
# wal-bytes-per-sync = "0KB"
# Specify the maximal size of the Rocksdb info log file. If the log file
# is larger than `max_log_file_size`, a new info log file will be created.
# If max_log_file_size == 0, all logs will be written to one log file.
# Default: 1GB
# info-log-max-size = "1GB"
# Time for the Rocksdb info log file to roll (in seconds).
# If specified with non-zero value, log file will be rolled
# if it has been active longer than `log_file_time_to_roll`.
# Default: 0 (disabled)
# info-log-roll-time = "0"
# Maximal Rocksdb info log files to be kept.
# Default: 10
# info-log-keep-log-file-num = 10
# This specifies the Rocksdb info LOG dir.
# If it is empty, the log files will be in the same dir as data.
# If it is non empty, the log files will be in the specified dir,
# and the db data dir's absolute path will be used as the log file
# name's prefix.
# Default: empty
# info-log-dir = ""
# Column Family default used to store actual data of the database.
[rocksdb.defaultcf]
# compression method (if any) is used to compress a block.
# no: kNoCompression
# snappy: kSnappyCompression
# zlib: kZlibCompression
# bzip2: kBZip2Compression
# lz4: kLZ4Compression
# lz4hc: kLZ4HCCompression
# zstd: kZSTD
# per level compression
# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]
# Approximate size of user data packed per block. Note that the
# block size specified here corresponds to uncompressed data.
# block-size = "64KB"
# If you're doing point lookups you definitely want to turn bloom filters on, We use
# bloom filters to avoid unnecessary disk reads. Default bits_per_key is 10, which
# yields ~1% false positive rate. Larger bits_per_key values will reduce false positive
# rate, but increase memory usage and space amplification.
# bloom-filter-bits-per-key = 10
# false means one sst file one bloom filter, true means evry block has a corresponding bloom filter
# block-based-bloom-filter = false
# level0-file-num-compaction-trigger = 4
# Soft limit on number of level-0 files. We start slowing down writes at this point.
# level0-slowdown-writes-trigger = 20
# Maximum number of level-0 files. We stop writes at this point.
# level0-stop-writes-trigger = 36
# Amount of data to build up in memory (backed by an unsorted log
# on disk) before converting to a sorted on-disk file.
# write-buffer-size = "128MB"
# The maximum number of write buffers that are built up in memory.
# max-write-buffer-number = 5
# The minimum number of write buffers that will be merged together
# before writing to storage.
# min-write-buffer-number-to-merge = 1
# Control maximum total data size for base level (level 1).
# max-bytes-for-level-base = "512MB"
# Target file size for compaction.
# target-file-size-base = "8MB"
# Max bytes for compaction.max_compaction_bytes
# max-compaction-bytes = "2GB"
# There are four different algorithms to pick files to compact.
# 0 : ByCompensatedSize
# 1 : OldestLargestSeqFirst
# 2 : OldestSmallestSeqFirst
# 3 : MinOverlappingRatio
# compaction-pri = 3
# block-cache used to cache uncompressed blocks, big block-cache can speed up read.
# in normal cases should tune to 30%-50% system's total memory.
# block-cache-size = "1GB"
# Indicating if we'd put index/filter blocks to the block cache.
# If not specified, each "table reader" object will pre-load index/filter block
# during table initialization.
# cache-index-and-filter-blocks = true
# Pin level0 filter and index blocks in cache.
# pin-l0-filter-and-index-blocks = true
# Enable read amplication statistics.
# value => memory usage (percentage of loaded blocks memory)
# 1 => 12.50 %
# 2 => 06.25 %
# 4 => 03.12 %
# 8 => 01.56 %
# 16 => 00.78 %
# read-amp-bytes-per-bit = 0
# Pick target size of each level dynamically.
# dynamic-level-bytes = true
# Options for Column Family write
# Column Family write used to store commit informations in MVCC model
[rocksdb.writecf]
# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]
# block-size = "64KB"
# write-buffer-size = "128MB"
# max-write-buffer-number = 5
# min-write-buffer-number-to-merge = 1
# max-bytes-for-level-base = "512MB"
# target-file-size-base = "8MB"
# in normal cases should tune to 10%-30% system's total memory.
# block-cache-size = "256MB"
# level0-file-num-compaction-trigger = 4
# level0-slowdown-writes-trigger = 20
# level0-stop-writes-trigger = 36
# cache-index-and-filter-blocks = true
# pin-l0-filter-and-index-blocks = true
# compaction-pri = 3
# read-amp-bytes-per-bit = 0
# dynamic-level-bytes = true
[rocksdb.lockcf]
# compression-per-level = ["no", "no", "no", "no", "no", "no", "no"]
# block-size = "16KB"
# write-buffer-size = "128MB"
# max-write-buffer-number = 5
# min-write-buffer-number-to-merge = 1
# max-bytes-for-level-base = "128MB"
# target-file-size-base = "8MB"
# block-cache-size = "256MB"
# level0-file-num-compaction-trigger = 1
# level0-slowdown-writes-trigger = 20
# level0-stop-writes-trigger = 36
# cache-index-and-filter-blocks = true
# pin-l0-filter-and-index-blocks = true
# compaction-pri = 0
# read-amp-bytes-per-bit = 0
# dynamic-level-bytes = true
[raftdb]
# max-sub-compactions = 1
max-open-files = 1024
# max-manifest-file-size = "20MB"
# create-if-missing = true
# enable-statistics = true
# stats-dump-period = "10m"
# compaction-readahead-size = 0
# writable-file-max-buffer-size = "1MB"
# use-direct-io-for-flush-and-compaction = false
# enable-pipelined-write = true
# allow-concurrent-memtable-write = false
# bytes-per-sync = "0MB"
# wal-bytes-per-sync = "0KB"
# info-log-max-size = "1GB"
# info-log-roll-time = "0"
# info-log-keep-log-file-num = 10
# info-log-dir = ""
[raftdb.defaultcf]
# compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]
# block-size = "64KB"
# write-buffer-size = "128MB"
# max-write-buffer-number = 5
# min-write-buffer-number-to-merge = 1
# max-bytes-for-level-base = "512MB"
# target-file-size-base = "8MB"
# should tune to 256MB~2GB.
# block-cache-size = "256MB"
# level0-file-num-compaction-trigger = 4
# level0-slowdown-writes-trigger = 20
# level0-stop-writes-trigger = 36
# cache-index-and-filter-blocks = true
# pin-l0-filter-and-index-blocks = true
# compaction-pri = 0
# read-amp-bytes-per-bit = 0
# dynamic-level-bytes = true
[security]
# set the path for certificates. Empty string means disabling secure connectoins.
# ca-path = ""
# cert-path = ""
# key-path = ""
[import]
# the directory to store importing kv data.
# import-dir = "/tmp/tikv/import"
# number of threads to handle RPC requests.
# num-threads = 8
# stream channel window size, stream will be blocked on channel full.
# stream-channel-window = 128

@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "2.1"
services:
pd:
image: pingcap/pd:latest
ports:
- "2379:2379"
volumes:
- ./config/pd.toml:/pd.toml
command:
- --client-urls=http://0.0.0.0:2379
- --peer-urls=http://0.0.0.0:2380
- --advertise-client-urls=http://192.168.30.157:2379
- --advertise-peer-urls=http://pd:2380
- --initial-cluster=pd=http://pd:2380
- --data-dir=/data/pd
- --config=/pd.toml
- --log-file=/logs/pd.log
restart: on-failure
tikv:
image: pingcap/tikv:latest
ports:
- "20160:20160"
volumes:
- ./config/tikv.toml:/tikv.toml
command:
- --addr=0.0.0.0:20160
- --advertise-addr=192.168.30.157:20160
- --data-dir=/data/tikv
- --pd=pd:2379
- --config=/tikv.toml
- --log-file=/logs/tikv.log
depends_on:
- "pd"
restart: on-failure
tidb:
image: pingcap/tidb:latest
volumes:
- ./config/tidb.toml:/tidb.toml
command:
- --store=tikv
- --path=pd:2379
- --config=/tidb.toml
- --log-file=/logs/tidb.log
- --advertise-address=tidb
depends_on:
- "tikv"
restart: on-failure

@ -15,10 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR

Loading…
Cancel
Save