[tidb][test] Add TiDBTableSourceFactoryTest and TiDBConnectorITCase

pull/898/head
eastfisher 3 years ago committed by Leonard Xu
parent e718445aa5
commit c51a35d7cd

@ -72,8 +72,8 @@ public class TDBSourceOptions {
"Optional startup mode for TiDB CDC consumer, valid enumerations are "
+ "\"initial\", \"latest-offset\"");
public static final ConfigOption<String> TIKV_PD_ADDRESSES =
ConfigOptions.key(ConfigUtils.TIKV_PD_ADDRESSES)
public static final ConfigOption<String> PD_ADDRESSES =
ConfigOptions.key("pd-addresses")
.stringType()
.noDefaultValue()
.withDescription("TiKV cluster's PD address");
@ -114,15 +114,11 @@ public class TDBSourceOptions {
.noDefaultValue()
.withDescription("TiKV GRPC batch delete concurrency");
public static TiConfiguration getTiConfiguration(final Map<String, String> options) {
public static TiConfiguration getTiConfiguration(
final String pdAddrsStr, final Map<String, String> options) {
final Configuration configuration = Configuration.fromMap(options);
final TiConfiguration tiConf =
configuration
.getOptional(TIKV_PD_ADDRESSES)
.map(TiConfiguration::createDefault)
.orElseGet(TiConfiguration::createDefault);
final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr);
configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout);
configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout);
configuration

@ -98,7 +98,7 @@ public class TiDBTableSource implements ScanTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
final TiConfiguration tiConf = TDBSourceOptions.getTiConfiguration(options);
final TiConfiguration tiConf = TDBSourceOptions.getTiConfiguration(pdAddresses, options);
try (final TiSession session = TiSession.create(tiConf)) {
final TiTableInfo tableInfo = session.getCatalog().getTable(database, tableName);

@ -26,12 +26,15 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.DATABASE_NAME;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.PD_ADDRESSES;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.SCAN_STARTUP_MODE;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TABLE_NAME;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_DELETE_CONCURRENCY;
@ -40,7 +43,6 @@ import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_PUT_
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_TIMEOUT;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_PD_ADDRESSES;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.USERNAME;
/** Factory for creating configured instance of {@link TiDBTableSource}. */
@ -59,7 +61,7 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
String pdAddresses = config.get(TIKV_PD_ADDRESSES);
String pdAddresses = config.get(PD_ADDRESSES);
StartupOptions startupOptions = getStartupOptions(config);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
@ -72,7 +74,7 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
password,
pdAddresses,
startupOptions,
context.getCatalogTable().getOptions());
TiKVOptions.getTiKVOptions(context.getCatalogTable().getOptions()));
}
@Override
@ -88,7 +90,7 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(PASSWORD);
options.add(DATABASE_NAME);
options.add(TABLE_NAME);
options.add(TIKV_PD_ADDRESSES);
options.add(PD_ADDRESSES);
return options;
}
@ -128,4 +130,31 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
modeString));
}
}
static class TiKVOptions {
private static final String TIKV_OPTIONS_PREFIX = "tikv.";
public static Map<String, String> getTiKVOptions(Map<String, String> properties) {
Map<String, String> tikvOptions = new HashMap<>();
if (hasTiKVOptions(properties)) {
properties.keySet().stream()
.filter(key -> key.startsWith(TIKV_OPTIONS_PREFIX))
.forEach(
key -> {
final String value = properties.get(key);
tikvOptions.put(key, value);
});
}
return tikvOptions;
}
/**
* Decides if the table options contains Debezium client properties that start with prefix
* 'debezium'.
*/
private static boolean hasTiKVOptions(Map<String, String> options) {
return options.keySet().stream().anyMatch(k -> k.startsWith(TIKV_OPTIONS_PREFIX));
}
}
}

@ -73,7 +73,7 @@ public class TiDBConnectorITCase extends TiDBTestBase {
+ " 'connector' = 'tidb-cdc',"
+ " 'hostname' = '%s',"
+ " 'tikv.grpc.timeout_in_ms' = '20000',"
+ " 'tikv.pd.addresses' = '%s',"
+ " 'pd-addresses' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
@ -169,6 +169,185 @@ public class TiDBConnectorITCase extends TiDBTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testDeleteColumn() throws Exception {
initializeTidbTable("inventory");
String sourceDDL =
String.format(
"CREATE TABLE tidb_source ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'tidb-cdc',"
+ " 'hostname' = '%s',"
+ " 'tikv.grpc.timeout_in_ms' = '20000',"
+ " 'pd-addresses' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
getTIDBHost(),
getPDHost() + ":" + getPDPort(),
TIDB_USER,
TIDB_PASSWORD,
"inventory",
"products");
String sinkDDL =
"CREATE TABLE sink ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
// wait for snapshot finished and begin binlog
waitForSinkSize("sink", 9);
try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
statement.execute("ALTER TABLE products DROP COLUMN description");
statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO products VALUES (default,'jacket',0.2);"); // 110
statement.execute(
"INSERT INTO products VALUES (default,'scooter',5.18);"); // 111
statement.execute(
"UPDATE products SET name='jacket2', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
waitForSinkSize("sink", 15);
List<String> expected =
Arrays.asList(
"+I(101,scooter,Small 2-wheel scooter,3.1400000000)",
"+I(102,car battery,12V car battery,8.1000000000)",
"+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)",
"+I(104,hammer,12oz carpenter's hammer,0.7500000000)",
"+I(105,hammer,14oz carpenter's hammer,0.8750000000)",
"+I(106,hammer,16oz carpenter's hammer,1.0000000000)",
"+I(107,rocks,box of assorted rocks,5.3000000000)",
"+I(108,jacket,water resistent black wind breaker,0.1000000000)",
"+I(109,spare tire,24 inch spare tire,22.2000000000)",
"+U(107,rocks,null,5.1000000000)",
"+I(110,jacket,null,0.2000000000)",
"+I(111,scooter,null,5.1800000000)",
"+U(110,jacket2,null,0.5000000000)",
"+U(111,scooter,null,5.1700000000)",
"-D(111,scooter,null,5.1700000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
@Test
public void testAddColumn() throws Exception {
initializeTidbTable("inventory");
String sourceDDL =
String.format(
"CREATE TABLE tidb_source ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'tidb-cdc',"
+ " 'hostname' = '%s',"
+ " 'tikv.grpc.timeout_in_ms' = '20000',"
+ " 'pd-addresses' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
getTIDBHost(),
getPDHost() + ":" + getPDPort(),
TIDB_USER,
TIDB_PASSWORD,
"inventory",
"products");
String sinkDDL =
"CREATE TABLE sink ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
// wait for snapshot finished and begin binlog
waitForSinkSize("sink", 9);
try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
statement.execute("ALTER TABLE products ADD COLUMN serialnum INTEGER");
statement.execute(
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2,null);"); // 110
statement.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18,1);");
statement.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
waitForSinkSize("sink", 16);
List<String> expected =
Arrays.asList(
"+I(101,scooter,Small 2-wheel scooter,3.1400000000)",
"+I(102,car battery,12V car battery,8.1000000000)",
"+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)",
"+I(104,hammer,12oz carpenter's hammer,0.7500000000)",
"+I(105,hammer,14oz carpenter's hammer,0.8750000000)",
"+I(106,hammer,16oz carpenter's hammer,1.0000000000)",
"+I(107,rocks,box of assorted rocks,5.3000000000)",
"+I(108,jacket,water resistent black wind breaker,0.1000000000)",
"+I(109,spare tire,24 inch spare tire,22.2000000000)",
"+U(106,hammer,18oz carpenter hammer,1.0000000000)",
"+U(107,rocks,box of assorted rocks,5.1000000000)",
"+I(110,jacket,water resistent white wind breaker,0.2000000000)",
"+I(111,scooter,Big 2-wheel scooter ,5.1800000000)",
"+U(110,jacket,new water resistent white wind breaker,0.5000000000)",
"+U(111,scooter,Big 2-wheel scooter ,5.1700000000)",
"-D(111,scooter,Big 2-wheel scooter ,5.1700000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
/** Integration tests for TiDB table source factory. */
public class TiDBTableSourceFactoryTest {
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
Arrays.asList(
Column.physical("aaa", DataTypes.INT().notNull()),
Column.physical("bbb", DataTypes.STRING().notNull()),
Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3))),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final String MY_HOSTNAME = "localhost:4000";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
private static final String MY_DATABASE = "myDB";
private static final String MY_TABLE = "myTable";
private static final String PD_ADDRESS = "pd:2379";
private static final Map<String, String> OPTIONS = new HashMap<>();
@Test
public void testCommonProperties() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
TiDBTableSource expectedSource =
new TiDBTableSource(
SCHEMA,
MY_HOSTNAME,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
PD_ADDRESS,
StartupOptions.latest(),
OPTIONS);
assertEquals(expectedSource, actualSource);
}
@Test
public void testOptionalProperties() {
Map<String, String> properties = getAllOptions();
properties.put("tikv.grpc.timeout_in_ms", "20000");
properties.put("tikv.grpc.scan_timeout_in_ms", "20000");
properties.put("tikv.batch_get_concurrency", "4");
properties.put("tikv.batch_put_concurrency", "4");
properties.put("tikv.batch_scan_concurrency", "4");
properties.put("tikv.batch_delete_concurrency", "4");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
Map<String, String> options = new HashMap<>();
options.put("tikv.grpc.timeout_in_ms", "20000");
options.put("tikv.grpc.scan_timeout_in_ms", "20000");
options.put("tikv.batch_get_concurrency", "4");
options.put("tikv.batch_put_concurrency", "4");
options.put("tikv.batch_scan_concurrency", "4");
options.put("tikv.batch_delete_concurrency", "4");
TiDBTableSource expectedSource =
new TiDBTableSource(
SCHEMA,
MY_HOSTNAME,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
PD_ADDRESS,
StartupOptions.latest(),
options);
assertEquals(expectedSource, actualSource);
}
private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "tidb-cdc");
options.put("hostname", MY_HOSTNAME);
options.put("database-name", MY_DATABASE);
options.put("table-name", MY_TABLE);
options.put("username", MY_USERNAME);
options.put("password", MY_PASSWORD);
options.put("pd-addresses", PD_ADDRESS);
options.put("scan.startup.mode", "latest-offset");
return options;
}
private static DynamicTableSource createTableSource(
ResolvedSchema schema, Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(schema).build(),
"mock source",
new ArrayList<>(),
options),
schema),
new Configuration(),
TiDBTableSourceFactoryTest.class.getClassLoader(),
false);
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return createTableSource(SCHEMA, options);
}
}
Loading…
Cancel
Save