[tidb] Add metadata column ITCase and improve the code style

This closes #898.
pull/898/head
Leonard Xu 3 years ago
parent 22a468861a
commit 39a99741b4

@ -253,16 +253,6 @@ under the License.
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-tidb-cdc</artifactId>
<version>${project.version}</version>
<destFileName>tidb-cdc-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</plugin>

@ -221,7 +221,6 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
@Override
public void cancel() {
// TODO: abort pending transactions
try {
if (cdcClient != null) {
cdcClient.close();

@ -84,17 +84,6 @@ public class RowDataTiKVChangeEventDeserializationSchema
row.getValue().toByteArray(), handle, tableInfo));
emit(new TiKVMetadataConverter.TiKVRowValue(row), rowDataInsert, out);
} else {
// TODO TiKV cdc client doesn't return old value in PUT event
// if (!row.getOldValue().isEmpty()) {
// out.collect(
// GenericRowData.ofKind(
// RowKind.UPDATE_BEFORE,
// getRowDataFields(
//
// row.getOldValue().toByteArray(),
// handle,
// tableInfo)));
// }
RowData rowDataUpdate =
GenericRowData.ofKind(
RowKind.UPDATE_AFTER,

@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.tidb.table;
/**
* Startup modes for the Oracle CDC Consumer.
* Startup modes for the TiDB CDC Consumer.
*
* @see StartupOptions
*/

@ -22,6 +22,7 @@ import java.util.Objects;
/** TiDB CDC Source startup options. */
public final class StartupOptions {
public final StartupMode startupMode;
/**

@ -98,8 +98,6 @@ public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
// TODO TiKV cdc client doesn't return old value in PUT event
// .addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();

@ -27,6 +27,7 @@ import java.io.Serializable;
/** Emits a row with physical fields and metadata fields. */
public class TiKVAppendMetadataCollector implements Collector<RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final TiKVMetadataConverter[] metadataConverters;

@ -29,21 +29,22 @@ import java.io.Serializable;
@FunctionalInterface
@Internal
public interface TiKVMetadataConverter extends Serializable {
Object read(TiKVRowValue row);
/** TiKV Row Value. */
class TiKVRowValue {
public boolean isKv;
public boolean isSnapshotRecord;
public Kvrpcpb.KvPair kvPair;
public Cdcpb.Event.Row row;
public TiKVRowValue(Kvrpcpb.KvPair kvPair) {
this.isKv = true;
this.isSnapshotRecord = true;
this.kvPair = kvPair;
}
public TiKVRowValue(Cdcpb.Event.Row row) {
this.isKv = false;
this.isSnapshotRecord = false;
this.row = row;
}
}

@ -92,9 +92,9 @@ public class TiKVReadableMetadata {
@Override
public Object read(TiKVRowValue row) {
if (row.isKv) {
// We cannot get ts from KvPair, use default value.
return TimestampData.fromEpochMillis(0);
if (row.isSnapshotRecord) {
// Uses OL as the operation time of snapshot records.
return TimestampData.fromEpochMillis(0L);
} else {
return TimestampData.fromEpochMillis(row.row.getStartTs());
}

@ -256,7 +256,6 @@ public class TiKVTypeUtils {
break;
case "BigDecimal":
BigDecimal bigDecimal = (BigDecimal) object;
// TODO improve the conversion code
int precision = ((DecimalType) dataType.getLogicalType()).getPrecision();
int scale = ((DecimalType) dataType.getLogicalType()).getScale();
result = DecimalData.fromBigDecimal(bigDecimal, precision, scale);

@ -345,40 +345,83 @@ public class TiDBConnectorITCase extends TiDBTestBase {
result.getJobClient().get().cancel().get();
}
/* @Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
TiDBTableSource tidbTableSource = (TiDBTableSource) actualSource;
tidbTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "table_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = tidbTableSource.copy();
TiDBTableSource expectedSource =
new TiDBTableSource(
SCHEMA_WITH_METADATA,
MY_HOSTNAME,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
PD_ADDRESS,
StartupOptions.latest(),
OPTIONS);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
tidbTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
TiKVRichParallelSourceFunction<RowData> sourceFunction =
(TiKVRichParallelSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(sourceFunction, expectedSource.producedDataType);
}*/
@Test
public void testMetadataColumns() throws Exception {
initializeTidbTable("inventory");
String sourceDDL =
String.format(
"CREATE TABLE tidb_source ("
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL,"
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'tidb-cdc',"
+ " 'hostname' = '%s',"
+ " 'tikv.grpc.timeout_in_ms' = '20000',"
+ " 'pd-addresses' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
TIDB.getContainerIpAddress(),
PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT_ORIGIN),
TIDB_USER,
TIDB_PASSWORD,
"inventory",
"products");
String sinkDDL =
"CREATE TABLE sink ("
+ " database_name STRING,"
+ " table_name STRING,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " primary key (database_name, table_name, id) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");
// wait for snapshot finished and begin binlog
waitForSinkSize("sink", 9);
try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
}
waitForSinkSize("sink", 10);
List<String> expected =
Arrays.asList(
"+I(inventory,products,101,scooter,Small 2-wheel scooter,3.1400000000)",
"+I(inventory,products,102,car battery,12V car battery,8.1000000000)",
"+I(inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)",
"+I(inventory,products,104,hammer,12oz carpenter's hammer,0.7500000000)",
"+I(inventory,products,105,hammer,14oz carpenter's hammer,0.8750000000)",
"+I(inventory,products,106,hammer,16oz carpenter's hammer,1.0000000000)",
"+I(inventory,products,107,rocks,box of assorted rocks,5.3000000000)",
"+I(inventory,products,108,jacket,water resistent black wind breaker,0.1000000000)",
"+I(inventory,products,109,spare tire,24 inch spare tire,22.2000000000)",
"+U(inventory,products,106,hammer,18oz carpenter hammer,1.0000000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {

@ -18,7 +18,6 @@
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
@ -29,12 +28,8 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction;
import org.junit.Test;
import java.util.ArrayList;
@ -43,12 +38,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
/** Integration tests for TiDB table source factory. */
public class TiDBTableSourceFactoryITCase {
/** Unit tests for TiDB table source factory. */
public class TiDBTableSourceFactoryTest {
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
@ -162,21 +155,11 @@ public class TiDBTableSourceFactoryITCase {
options),
schema),
new Configuration(),
TiDBTableSourceFactoryITCase.class.getClassLoader(),
TiDBTableSourceFactoryTest.class.getClassLoader(),
false);
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return createTableSource(SCHEMA, options);
}
private static void assertProducedTypeOfSourceFunction(
TiKVRichParallelSourceFunction<RowData> sourceFunction, DataType expectedProducedType) {
TypeInformation<RowData> producedType = sourceFunction.getProducedType();
assertThat(producedType, instanceOf(InternalTypeInfo.class));
InternalTypeInfo<RowData> rowDataInternalTypeInfo =
(InternalTypeInfo<RowData>) producedType;
DataType producedDataType = rowDataInternalTypeInfo.getDataType();
assertEquals(expectedProducedType.toString(), producedDataType.toString());
}
}

@ -56,7 +56,6 @@ under the License.
<include>com.ververica:flink-connector-tidb-cdc</include>
<include>org.tikv:tikv-client-java</include>
<include>com.google.protobuf:*</include>
<!-- TODO shade grpc dependency with resources -->
<include>io.grpc:*</include>
</includes>
</artifactSet>
@ -67,6 +66,12 @@ under the License.
com.ververica.cdc.connectors.shaded.com.google
</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.io.grpc
</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>

Loading…
Cancel
Save