diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java index cdf58e684..7b15b3681 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java @@ -38,6 +38,7 @@ import static org.tikv.common.codec.TableCodec.decodeObjects; * RowData}. */ public class RowDataTiKVChangeEventDeserializationSchema + extends RowDataTiKVEventDeserializationSchemaBase implements TiKVChangeEventDeserializationSchema { private static final long serialVersionUID = 1L; @@ -49,7 +50,11 @@ public class RowDataTiKVChangeEventDeserializationSchema private final TiTableInfo tableInfo; public RowDataTiKVChangeEventDeserializationSchema( - TypeInformation resultTypeInfo, TiTableInfo tableInfo) { + TypeInformation resultTypeInfo, + TiTableInfo tableInfo, + TiKVMetadataConverter[] metadataConverters) { + + super(metadataConverters); this.resultTypeInfo = resultTypeInfo; this.tableInfo = tableInfo; } @@ -60,23 +65,24 @@ public class RowDataTiKVChangeEventDeserializationSchema final long handle = rowKey.getHandle(); switch (row.getOpType()) { case DELETE: - out.collect( + RowData rowDataDelete = GenericRowData.ofKind( RowKind.DELETE, getObjectsWithDataTypes( decodeObjects( row.getOldValue().toByteArray(), handle, tableInfo), - tableInfo))); + tableInfo)); + emit(new TiKVMetadataConverter.TiKVRowValue(row), rowDataDelete, out); break; case PUT: try { if (row.getOldValue() == null) { - out.collect( + RowData rowDataInsert = GenericRowData.ofKind( RowKind.INSERT, getRowDataFields( - row.getValue().toByteArray(), handle, tableInfo))); - + row.getValue().toByteArray(), handle, tableInfo)); + emit(new TiKVMetadataConverter.TiKVRowValue(row), rowDataInsert, out); } else { // TODO TiKV cdc client doesn't return old value in PUT event // if (!row.getOldValue().isEmpty()) { @@ -89,11 +95,12 @@ public class RowDataTiKVChangeEventDeserializationSchema // handle, // tableInfo))); // } - out.collect( + RowData rowDataUpdate = GenericRowData.ofKind( RowKind.UPDATE_AFTER, getRowDataFields( - row.getValue().toByteArray(), handle, tableInfo))); + row.getValue().toByteArray(), handle, tableInfo)); + emit(new TiKVMetadataConverter.TiKVRowValue(row), rowDataUpdate, out); } break; } catch (final RuntimeException e) { diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVEventDeserializationSchemaBase.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVEventDeserializationSchemaBase.java new file mode 100644 index 000000000..84652004e --- /dev/null +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVEventDeserializationSchemaBase.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.tidb.table; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class of deserialization schema from TiKV RowValue (Snapshot or Change Event) to Flink + * Table/SQL internal data structure {@link RowData}. + */ +public class RowDataTiKVEventDeserializationSchemaBase implements Serializable { + private static final long serialVersionUID = 1L; + + /** Whether the deserializer needs to handle metadata columns. */ + private final boolean hasMetadata; + + /** + * A wrapped output collector which is used to append metadata columns after physical columns. + */ + private final TiKVAppendMetadataCollector appendMetadataCollector; + + public RowDataTiKVEventDeserializationSchemaBase(TiKVMetadataConverter[] metadataConverters) { + this.hasMetadata = checkNotNull(metadataConverters).length > 0; + this.appendMetadataCollector = new TiKVAppendMetadataCollector(metadataConverters); + } + + public void emit( + TiKVMetadataConverter.TiKVRowValue inRecord, + RowData physicalRow, + Collector collector) { + if (!hasMetadata) { + collector.collect(physicalRow); + return; + } + + appendMetadataCollector.row = inRecord; + appendMetadataCollector.outputCollector = collector; + appendMetadataCollector.collect(physicalRow); + } +} diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVSnapshotEventDeserializationSchema.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVSnapshotEventDeserializationSchema.java index 1811b62ea..435c7fce7 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVSnapshotEventDeserializationSchema.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/RowDataTiKVSnapshotEventDeserializationSchema.java @@ -37,6 +37,7 @@ import static org.tikv.common.codec.TableCodec.decodeObjects; * RowData}. */ public class RowDataTiKVSnapshotEventDeserializationSchema + extends RowDataTiKVEventDeserializationSchemaBase implements TiKVSnapshotEventDeserializationSchema { private static final long serialVersionUID = 1L; @@ -48,7 +49,11 @@ public class RowDataTiKVSnapshotEventDeserializationSchema private final TiTableInfo tableInfo; public RowDataTiKVSnapshotEventDeserializationSchema( - TypeInformation resultTypeInfo, TiTableInfo tableInfo) { + TypeInformation resultTypeInfo, + TiTableInfo tableInfo, + TiKVMetadataConverter[] metadataConverters) { + + super(metadataConverters); this.resultTypeInfo = resultTypeInfo; this.tableInfo = tableInfo; } @@ -57,10 +62,12 @@ public class RowDataTiKVSnapshotEventDeserializationSchema public void deserialize(KvPair record, Collector out) throws Exception { final RowKey rowKey = RowKey.decode(record.getKey().toByteArray()); final long handle = rowKey.getHandle(); - out.collect( + + RowData rowData = GenericRowData.ofKind( RowKind.INSERT, - getRowDataFields(record.getValue().toByteArray(), handle, tableInfo))); + getRowDataFields(record.getValue().toByteArray(), handle, tableInfo)); + emit(new TiKVMetadataConverter.TiKVRowValue(record), rowData, out); } private static Object[] getRowDataFields(byte[] value, Long handle, TiTableInfo tableInfo) { diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSource.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSource.java index 44e9f2e44..21c4d870f 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSource.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSource.java @@ -24,6 +24,7 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; @@ -35,8 +36,12 @@ import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; import org.tikv.common.meta.TiTableInfo; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -44,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A {@link DynamicTableSource} that describes how to create a TiDB binlog from a logical * description. */ -public class TiDBTableSource implements ScanTableSource { +public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata { private final ResolvedSchema physicalSchema; private final String hostname; @@ -63,6 +68,9 @@ public class TiDBTableSource implements ScanTableSource { /** Data type that describes the final output of the source. */ protected DataType producedDataType; + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + public TiDBTableSource( ResolvedSchema physicalSchema, String hostname, @@ -83,6 +91,7 @@ public class TiDBTableSource implements ScanTableSource { this.startupOptions = startupOptions; this.producedDataType = physicalSchema.toPhysicalRowDataType(); this.options = options; + this.metadataKeys = Collections.emptyList(); } @Override @@ -103,10 +112,13 @@ public class TiDBTableSource implements ScanTableSource { final TiTableInfo tableInfo = session.getCatalog().getTable(database, tableName); TypeInformation typeInfo = scanContext.createTypeInformation(producedDataType); + TiKVMetadataConverter[] metadataConverters = getMetadataConverters(); RowDataTiKVSnapshotEventDeserializationSchema snapshotEventDeserializationSchema = - new RowDataTiKVSnapshotEventDeserializationSchema(typeInfo, tableInfo); + new RowDataTiKVSnapshotEventDeserializationSchema( + typeInfo, tableInfo, metadataConverters); RowDataTiKVChangeEventDeserializationSchema changeEventDeserializationSchema = - new RowDataTiKVChangeEventDeserializationSchema(typeInfo, tableInfo); + new RowDataTiKVChangeEventDeserializationSchema( + typeInfo, tableInfo, metadataConverters); TiDBSource.Builder builder = TiDBSource.builder() @@ -140,9 +152,28 @@ public class TiDBTableSource implements ScanTableSource { startupOptions, options); source.producedDataType = producedDataType; + source.metadataKeys = metadataKeys; return source; } + private TiKVMetadataConverter[] getMetadataConverters() { + if (metadataKeys.isEmpty()) { + return new TiKVMetadataConverter[0]; + } + + return metadataKeys.stream() + .map( + key -> + Stream.of( + TiKVReadableMetadata.createTiKVReadableMetadata( + database, tableName)) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(TiKVReadableMetadata::getConverter) + .toArray(TiKVMetadataConverter[]::new); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -181,4 +212,18 @@ public class TiDBTableSource implements ScanTableSource { public String asSummaryString() { return "TiDB-CDC"; } + + @Override + public Map listReadableMetadata() { + return Stream.of(TiKVReadableMetadata.createTiKVReadableMetadata(database, tableName)) + .collect( + Collectors.toMap( + TiKVReadableMetadata::getKey, TiKVReadableMetadata::getDataType)); + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + this.producedDataType = producedDataType; + } } diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVAppendMetadataCollector.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVAppendMetadataCollector.java new file mode 100644 index 000000000..b9af3199d --- /dev/null +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVAppendMetadataCollector.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.tidb.table; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** Emits a row with physical fields and metadata fields. */ +public class TiKVAppendMetadataCollector implements Collector, Serializable { + private static final long serialVersionUID = 1L; + + private final TiKVMetadataConverter[] metadataConverters; + + public transient TiKVMetadataConverter.TiKVRowValue row; + public transient Collector outputCollector; + + public TiKVAppendMetadataCollector(TiKVMetadataConverter[] metadataConverters) { + this.metadataConverters = metadataConverters; + } + + @Override + public void collect(RowData physicalRow) { + GenericRowData metaRow = new GenericRowData(metadataConverters.length); + for (int i = 0; i < metadataConverters.length; i++) { + Object meta = metadataConverters[i].read(row); + metaRow.setField(i, meta); + } + RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow); + outputCollector.collect(outRow); + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVMetadataConverter.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVMetadataConverter.java new file mode 100644 index 000000000..12b60fd5d --- /dev/null +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVMetadataConverter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.tidb.table; + +import org.apache.flink.annotation.Internal; + +import org.tikv.kvproto.Cdcpb; +import org.tikv.kvproto.Kvrpcpb; + +import java.io.Serializable; + +/** A converter converts TiKV Row metadata into Flink internal data structures. */ +@FunctionalInterface +@Internal +public interface TiKVMetadataConverter extends Serializable { + Object read(TiKVRowValue row); + + /** TiKV Row Value. */ + class TiKVRowValue { + public boolean isKv; + public Kvrpcpb.KvPair kvPair; + public Cdcpb.Event.Row row; + + public TiKVRowValue(Kvrpcpb.KvPair kvPair) { + this.isKv = true; + this.kvPair = kvPair; + } + + public TiKVRowValue(Cdcpb.Event.Row row) { + this.isKv = false; + this.row = row; + } + } +} diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVReadableMetadata.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVReadableMetadata.java new file mode 100644 index 000000000..1a9f33ee0 --- /dev/null +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiKVReadableMetadata.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.tidb.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; + +import java.util.ArrayList; +import java.util.List; + +/** Defines the supported metadata columns for {@link TiDBTableSource}. */ +public class TiKVReadableMetadata { + + private final String key; + + private final DataType dataType; + + private final TiKVMetadataConverter converter; + + TiKVReadableMetadata(String key, DataType dataType, TiKVMetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + + public String getKey() { + return key; + } + + public DataType getDataType() { + return dataType; + } + + public TiKVMetadataConverter getConverter() { + return converter; + } + + /** Name of the table that contain the row. */ + public static TiKVReadableMetadata createTableNameMetadata(String tableName) { + return new TiKVReadableMetadata( + "table_name", + DataTypes.STRING().notNull(), + new TiKVMetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(TiKVRowValue row) { + return StringData.fromString(tableName); + } + }); + } + + /** Name of the database that contain the row. */ + public static TiKVReadableMetadata createDatabaseNameMetadata(String database) { + return new TiKVReadableMetadata( + "database_name", + DataTypes.STRING().notNull(), + new TiKVMetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(TiKVRowValue row) { + return StringData.fromString(database); + } + }); + } + + public static TiKVReadableMetadata createOpTsMetadata() { + return new TiKVReadableMetadata( + "op_ts", + DataTypes.TIMESTAMP_LTZ(3).notNull(), + new TiKVMetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(TiKVRowValue row) { + if (row.isKv) { + // We cannot get ts from KvPair, use default value. + return TimestampData.fromEpochMillis(0); + } else { + return TimestampData.fromEpochMillis(row.row.getStartTs()); + } + } + }); + } + + public static TiKVReadableMetadata[] createTiKVReadableMetadata( + String database, String tableName) { + List list = new ArrayList<>(); + list.add(createDatabaseNameMetadata(database)); + list.add(createTableNameMetadata(tableName)); + list.add(createOpTsMetadata()); + return list.toArray(new TiKVReadableMetadata[0]); + } +} diff --git a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryITCase.java similarity index 57% rename from flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java rename to flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryITCase.java index 4162d8b35..4df35513d 100644 --- a/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java +++ b/flink-connector-tidb-cdc/src/test/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactoryITCase.java @@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.tidb.table; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; @@ -28,19 +29,30 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import com.ververica.cdc.connectors.tidb.TiDBTestBase; +import com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; /** Integration tests for TiDB table source factory. */ -public class TiDBTableSourceFactoryTest { +public class TiDBTableSourceFactoryITCase extends TiDBTestBase { private static final ResolvedSchema SCHEMA = new ResolvedSchema( @@ -53,12 +65,26 @@ public class TiDBTableSourceFactoryTest { new ArrayList<>(), UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa"))); - private static final String MY_HOSTNAME = "localhost:4000"; - private static final String MY_USERNAME = "flinkuser"; - private static final String MY_PASSWORD = "flinkpw"; - private static final String MY_DATABASE = "myDB"; - private static final String MY_TABLE = "myTable"; - private static final String PD_ADDRESS = "pd:2379"; + private static final ResolvedSchema SCHEMA_WITH_METADATA = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.BIGINT().notNull()), + Column.physical("name", DataTypes.STRING()), + Column.physical("count", DataTypes.DECIMAL(38, 18)), + Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), + Column.metadata( + "database_name", DataTypes.STRING(), "database_name", true), + Column.metadata("table_name", DataTypes.STRING(), "table_name", true), + Column.metadata("op_ts", DataTypes.TIMESTAMP(), "op_ts", true)), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); + + private static final String MY_HOSTNAME = "tidb0:4000"; + private static final String MY_USERNAME = "root"; + private static final String MY_PASSWORD = ""; + private static final String MY_DATABASE = "inventory"; + private static final String MY_TABLE = "products"; + private static final String PD_ADDRESS = "pd0:2379"; private static final Map OPTIONS = new HashMap<>(); @Test @@ -114,6 +140,41 @@ public class TiDBTableSourceFactoryTest { assertEquals(expectedSource, actualSource); } + @Test + public void testMetadataColumns() { + Map properties = getAllOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); + TiDBTableSource tidbTableSource = (TiDBTableSource) actualSource; + tidbTableSource.applyReadableMetadata( + Arrays.asList("op_ts", "database_name", "table_name"), + SCHEMA_WITH_METADATA.toSourceRowDataType()); + actualSource = tidbTableSource.copy(); + TiDBTableSource expectedSource = + new TiDBTableSource( + SCHEMA_WITH_METADATA, + MY_HOSTNAME, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + PD_ADDRESS, + StartupOptions.latest(), + OPTIONS); + expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); + expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name"); + + assertEquals(expectedSource, actualSource); + + ScanTableSource.ScanRuntimeProvider provider = + tidbTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + TiKVRichParallelSourceFunction sourceFunction = + (TiKVRichParallelSourceFunction) + ((SourceFunctionProvider) provider).createSourceFunction(); + assertProducedTypeOfSourceFunction(sourceFunction, expectedSource.producedDataType); + } + private Map getAllOptions() { Map options = new HashMap<>(); options.put("connector", "tidb-cdc"); @@ -140,11 +201,21 @@ public class TiDBTableSourceFactoryTest { options), schema), new Configuration(), - TiDBTableSourceFactoryTest.class.getClassLoader(), + TiDBTableSourceFactoryITCase.class.getClassLoader(), false); } private static DynamicTableSource createTableSource(Map options) { return createTableSource(SCHEMA, options); } + + private static void assertProducedTypeOfSourceFunction( + TiKVRichParallelSourceFunction sourceFunction, DataType expectedProducedType) { + TypeInformation producedType = sourceFunction.getProducedType(); + assertThat(producedType, instanceOf(InternalTypeInfo.class)); + InternalTypeInfo rowDataInternalTypeInfo = + (InternalTypeInfo) producedType; + DataType producedDataType = rowDataInternalTypeInfo.getDataType(); + assertEquals(expectedProducedType.toString(), producedDataType.toString()); + } }