From 39a99741b464dade0361452a4a0a520ebbcd40c0 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Wed, 16 Mar 2022 23:23:08 +0800 Subject: [PATCH] [tidb] Add metadata column ITCase and improve the code style This closes #898. --- flink-cdc-e2e-tests/pom.xml | 10 -- .../tidb/TiKVRichParallelSourceFunction.java | 1 - ...aTiKVChangeEventDeserializationSchema.java | 11 -- .../connectors/tidb/table/StartupMode.java | 2 +- .../connectors/tidb/table/StartupOptions.java | 1 + .../tidb/table/TiDBTableSource.java | 2 - .../table/TiKVAppendMetadataCollector.java | 1 + .../tidb/table/TiKVMetadataConverter.java | 7 +- .../tidb/table/TiKVReadableMetadata.java | 6 +- .../tidb/table/utils/TiKVTypeUtils.java | 1 - .../tidb/table/TiDBConnectorITCase.java | 111 ++++++++++++------ ...e.java => TiDBTableSourceFactoryTest.java} | 23 +--- flink-sql-connector-tidb-cdc/pom.xml | 7 +- 13 files changed, 96 insertions(+), 87 deletions(-) rename flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/{TiDBTableSourceFactoryITCase.java => TiDBTableSourceFactoryTest.java} (85%) diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml index 09ab29084..3c6d3d198 100644 --- a/flink-cdc-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/pom.xml @@ -253,16 +253,6 @@ under the License. ${project.build.directory}/dependencies - - - com.ververica - flink-sql-connector-tidb-cdc - ${project.version} - tidb-cdc-connector.jar - jar - ${project.build.directory}/dependencies - - diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java index d7b846729..27d87ffb7 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java @@ -221,7 +221,6 @@ public class TiKVRichParallelSourceFunction extends RichParallelSourceFunctio @Override public void cancel() { - // TODO: abort pending transactions try { if (cdcClient != null) { cdcClient.close(); diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java index 7b15b3681..3d4b212e8 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java @@ -84,17 +84,6 @@ public class RowDataTiKVChangeEventDeserializationSchema row.getValue().toByteArray(), handle, tableInfo)); emit(new TiKVMetadataConverter.TiKVRowValue(row), rowDataInsert, out); } else { - // TODO TiKV cdc client doesn't return old value in PUT event - // if (!row.getOldValue().isEmpty()) { - // out.collect( - // GenericRowData.ofKind( - // RowKind.UPDATE_BEFORE, - // getRowDataFields( - // - // row.getOldValue().toByteArray(), - // handle, - // tableInfo))); - // } RowData rowDataUpdate = GenericRowData.ofKind( RowKind.UPDATE_AFTER, diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/StartupMode.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/StartupMode.java index 4ffd2e879..9838f5713 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/StartupMode.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/StartupMode.java @@ -19,7 +19,7 @@ package com.ververica.cdc.connectors.tidb.table; /** - * Startup modes for the Oracle CDC Consumer. + * Startup modes for the TiDB CDC Consumer. * * @see StartupOptions */ diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/StartupOptions.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/StartupOptions.java index 951cd3c27..2c92f1fce 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/StartupOptions.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/StartupOptions.java @@ -22,6 +22,7 @@ import java.util.Objects; /** TiDB CDC Source startup options. */ public final class StartupOptions { + public final StartupMode startupMode; /** diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSource.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSource.java index 21c4d870f..c06bdfbd0 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSource.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSource.java @@ -98,8 +98,6 @@ public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata public ChangelogMode getChangelogMode() { return ChangelogMode.newBuilder() .addContainedKind(RowKind.INSERT) - // TODO TiKV cdc client doesn't return old value in PUT event - // .addContainedKind(RowKind.UPDATE_BEFORE) .addContainedKind(RowKind.UPDATE_AFTER) .addContainedKind(RowKind.DELETE) .build(); diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVAppendMetadataCollector.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVAppendMetadataCollector.java index b9af3199d..4675867d1 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVAppendMetadataCollector.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVAppendMetadataCollector.java @@ -27,6 +27,7 @@ import java.io.Serializable; /** Emits a row with physical fields and metadata fields. */ public class TiKVAppendMetadataCollector implements Collector, Serializable { + private static final long serialVersionUID = 1L; private final TiKVMetadataConverter[] metadataConverters; diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVMetadataConverter.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVMetadataConverter.java index 12b60fd5d..fc9394e68 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVMetadataConverter.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVMetadataConverter.java @@ -29,21 +29,22 @@ import java.io.Serializable; @FunctionalInterface @Internal public interface TiKVMetadataConverter extends Serializable { + Object read(TiKVRowValue row); /** TiKV Row Value. */ class TiKVRowValue { - public boolean isKv; + public boolean isSnapshotRecord; public Kvrpcpb.KvPair kvPair; public Cdcpb.Event.Row row; public TiKVRowValue(Kvrpcpb.KvPair kvPair) { - this.isKv = true; + this.isSnapshotRecord = true; this.kvPair = kvPair; } public TiKVRowValue(Cdcpb.Event.Row row) { - this.isKv = false; + this.isSnapshotRecord = false; this.row = row; } } diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVReadableMetadata.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVReadableMetadata.java index 1a9f33ee0..b25b8900a 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVReadableMetadata.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVReadableMetadata.java @@ -92,9 +92,9 @@ public class TiKVReadableMetadata { @Override public Object read(TiKVRowValue row) { - if (row.isKv) { - // We cannot get ts from KvPair, use default value. - return TimestampData.fromEpochMillis(0); + if (row.isSnapshotRecord) { + // Uses OL as the operation time of snapshot records. + return TimestampData.fromEpochMillis(0L); } else { return TimestampData.fromEpochMillis(row.row.getStartTs()); } diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/utils/TiKVTypeUtils.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/utils/TiKVTypeUtils.java index bd9ab1db9..f56ff448a 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/utils/TiKVTypeUtils.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/utils/TiKVTypeUtils.java @@ -256,7 +256,6 @@ public class TiKVTypeUtils { break; case "BigDecimal": BigDecimal bigDecimal = (BigDecimal) object; - // TODO improve the conversion code int precision = ((DecimalType) dataType.getLogicalType()).getPrecision(); int scale = ((DecimalType) dataType.getLogicalType()).getScale(); result = DecimalData.fromBigDecimal(bigDecimal, precision, scale); diff --git a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBConnectorITCase.java b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBConnectorITCase.java index 3c8441ce2..0db15772d 100644 --- a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBConnectorITCase.java +++ b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBConnectorITCase.java @@ -345,40 +345,83 @@ public class TiDBConnectorITCase extends TiDBTestBase { result.getJobClient().get().cancel().get(); } - /* @Test - public void testMetadataColumns() { - Map properties = getAllOptions(); - - // validation for source - DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); - TiDBTableSource tidbTableSource = (TiDBTableSource) actualSource; - tidbTableSource.applyReadableMetadata( - Arrays.asList("op_ts", "database_name", "table_name"), - SCHEMA_WITH_METADATA.toSourceRowDataType()); - actualSource = tidbTableSource.copy(); - TiDBTableSource expectedSource = - new TiDBTableSource( - SCHEMA_WITH_METADATA, - MY_HOSTNAME, - MY_DATABASE, - MY_TABLE, - MY_USERNAME, - MY_PASSWORD, - PD_ADDRESS, - StartupOptions.latest(), - OPTIONS); - expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); - expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name"); - - assertEquals(expectedSource, actualSource); - - ScanTableSource.ScanRuntimeProvider provider = - tidbTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); - TiKVRichParallelSourceFunction sourceFunction = - (TiKVRichParallelSourceFunction) - ((SourceFunctionProvider) provider).createSourceFunction(); - assertProducedTypeOfSourceFunction(sourceFunction, expectedSource.producedDataType); - }*/ + @Test + public void testMetadataColumns() throws Exception { + initializeTidbTable("inventory"); + + String sourceDDL = + String.format( + "CREATE TABLE tidb_source (" + + " db_name STRING METADATA FROM 'database_name' VIRTUAL," + + " table_name STRING METADATA VIRTUAL," + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'tidb-cdc'," + + " 'hostname' = '%s'," + + " 'tikv.grpc.timeout_in_ms' = '20000'," + + " 'pd-addresses' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'" + + ")", + TIDB.getContainerIpAddress(), + PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT_ORIGIN), + TIDB_USER, + TIDB_PASSWORD, + "inventory", + "products"); + + String sinkDDL = + "CREATE TABLE sink (" + + " database_name STRING," + + " table_name STRING," + + " `id` DECIMAL(20, 0) NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " primary key (database_name, table_name, id) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source"); + + // wait for snapshot finished and begin binlog + waitForSinkSize("sink", 9); + + try (Connection connection = getJdbcConnection("inventory"); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + } + + waitForSinkSize("sink", 10); + + List expected = + Arrays.asList( + "+I(inventory,products,101,scooter,Small 2-wheel scooter,3.1400000000)", + "+I(inventory,products,102,car battery,12V car battery,8.1000000000)", + "+I(inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)", + "+I(inventory,products,104,hammer,12oz carpenter's hammer,0.7500000000)", + "+I(inventory,products,105,hammer,14oz carpenter's hammer,0.8750000000)", + "+I(inventory,products,106,hammer,16oz carpenter's hammer,1.0000000000)", + "+I(inventory,products,107,rocks,box of assorted rocks,5.3000000000)", + "+I(inventory,products,108,jacket,water resistent black wind breaker,0.1000000000)", + "+I(inventory,products,109,spare tire,24 inch spare tire,22.2000000000)", + "+U(inventory,products,106,hammer,18oz carpenter hammer,1.0000000000)"); + List actual = TestValuesTableFactory.getRawResults("sink"); + assertEqualsInAnyOrder(expected, actual); + result.getJobClient().get().cancel().get(); + } private static void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException { diff --git a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryITCase.java b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java similarity index 85% rename from flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryITCase.java rename to flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java index cc8d334d9..a58e58466 100644 --- a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryITCase.java +++ b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java @@ -18,7 +18,6 @@ package com.ververica.cdc.connectors.tidb.table; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; @@ -29,12 +28,8 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -import org.apache.flink.table.types.DataType; -import com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction; import org.junit.Test; import java.util.ArrayList; @@ -43,12 +38,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -/** Integration tests for TiDB table source factory. */ -public class TiDBTableSourceFactoryITCase { +/** Unit tests for TiDB table source factory. */ +public class TiDBTableSourceFactoryTest { private static final ResolvedSchema SCHEMA = new ResolvedSchema( @@ -162,21 +155,11 @@ public class TiDBTableSourceFactoryITCase { options), schema), new Configuration(), - TiDBTableSourceFactoryITCase.class.getClassLoader(), + TiDBTableSourceFactoryTest.class.getClassLoader(), false); } private static DynamicTableSource createTableSource(Map options) { return createTableSource(SCHEMA, options); } - - private static void assertProducedTypeOfSourceFunction( - TiKVRichParallelSourceFunction sourceFunction, DataType expectedProducedType) { - TypeInformation producedType = sourceFunction.getProducedType(); - assertThat(producedType, instanceOf(InternalTypeInfo.class)); - InternalTypeInfo rowDataInternalTypeInfo = - (InternalTypeInfo) producedType; - DataType producedDataType = rowDataInternalTypeInfo.getDataType(); - assertEquals(expectedProducedType.toString(), producedDataType.toString()); - } } diff --git a/flink-sql-connector-tidb-cdc/pom.xml b/flink-sql-connector-tidb-cdc/pom.xml index 0cceac91d..a1ebe55c5 100644 --- a/flink-sql-connector-tidb-cdc/pom.xml +++ b/flink-sql-connector-tidb-cdc/pom.xml @@ -56,7 +56,6 @@ under the License. com.ververica:flink-connector-tidb-cdc org.tikv:tikv-client-java com.google.protobuf:* - io.grpc:* @@ -67,6 +66,12 @@ under the License. com.ververica.cdc.connectors.shaded.com.google + + io.grpc + + com.ververica.cdc.connectors.shaded.io.grpc + +