[tidb] Support metadata column

pull/898/head
eastfisher 3 years ago committed by Leonard Xu
parent 570cba1159
commit a728c89937

@ -38,6 +38,7 @@ import static org.tikv.common.codec.TableCodec.decodeObjects;
* RowData}.
*/
public class RowDataTiKVChangeEventDeserializationSchema
extends RowDataTiKVEventDeserializationSchemaBase
implements TiKVChangeEventDeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
@ -49,7 +50,11 @@ public class RowDataTiKVChangeEventDeserializationSchema
private final TiTableInfo tableInfo;
public RowDataTiKVChangeEventDeserializationSchema(
TypeInformation<RowData> resultTypeInfo, TiTableInfo tableInfo) {
TypeInformation<RowData> resultTypeInfo,
TiTableInfo tableInfo,
TiKVMetadataConverter[] metadataConverters) {
super(metadataConverters);
this.resultTypeInfo = resultTypeInfo;
this.tableInfo = tableInfo;
}
@ -60,23 +65,24 @@ public class RowDataTiKVChangeEventDeserializationSchema
final long handle = rowKey.getHandle();
switch (row.getOpType()) {
case DELETE:
out.collect(
RowData rowDataDelete =
GenericRowData.ofKind(
RowKind.DELETE,
getObjectsWithDataTypes(
decodeObjects(
row.getOldValue().toByteArray(), handle, tableInfo),
tableInfo)));
tableInfo));
emit(new TiKVMetadataConverter.TiKVRowValue(row), rowDataDelete, out);
break;
case PUT:
try {
if (row.getOldValue() == null) {
out.collect(
RowData rowDataInsert =
GenericRowData.ofKind(
RowKind.INSERT,
getRowDataFields(
row.getValue().toByteArray(), handle, tableInfo)));
row.getValue().toByteArray(), handle, tableInfo));
emit(new TiKVMetadataConverter.TiKVRowValue(row), rowDataInsert, out);
} else {
// TODO TiKV cdc client doesn't return old value in PUT event
// if (!row.getOldValue().isEmpty()) {
@ -89,11 +95,12 @@ public class RowDataTiKVChangeEventDeserializationSchema
// handle,
// tableInfo)));
// }
out.collect(
RowData rowDataUpdate =
GenericRowData.ofKind(
RowKind.UPDATE_AFTER,
getRowDataFields(
row.getValue().toByteArray(), handle, tableInfo)));
row.getValue().toByteArray(), handle, tableInfo));
emit(new TiKVMetadataConverter.TiKVRowValue(row), rowDataUpdate, out);
}
break;
} catch (final RuntimeException e) {

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Base class of deserialization schema from TiKV RowValue (Snapshot or Change Event) to Flink
* Table/SQL internal data structure {@link RowData}.
*/
public class RowDataTiKVEventDeserializationSchemaBase implements Serializable {
private static final long serialVersionUID = 1L;
/** Whether the deserializer needs to handle metadata columns. */
private final boolean hasMetadata;
/**
* A wrapped output collector which is used to append metadata columns after physical columns.
*/
private final TiKVAppendMetadataCollector appendMetadataCollector;
public RowDataTiKVEventDeserializationSchemaBase(TiKVMetadataConverter[] metadataConverters) {
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
this.appendMetadataCollector = new TiKVAppendMetadataCollector(metadataConverters);
}
public void emit(
TiKVMetadataConverter.TiKVRowValue inRecord,
RowData physicalRow,
Collector<RowData> collector) {
if (!hasMetadata) {
collector.collect(physicalRow);
return;
}
appendMetadataCollector.row = inRecord;
appendMetadataCollector.outputCollector = collector;
appendMetadataCollector.collect(physicalRow);
}
}

@ -37,6 +37,7 @@ import static org.tikv.common.codec.TableCodec.decodeObjects;
* RowData}.
*/
public class RowDataTiKVSnapshotEventDeserializationSchema
extends RowDataTiKVEventDeserializationSchemaBase
implements TiKVSnapshotEventDeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
@ -48,7 +49,11 @@ public class RowDataTiKVSnapshotEventDeserializationSchema
private final TiTableInfo tableInfo;
public RowDataTiKVSnapshotEventDeserializationSchema(
TypeInformation<RowData> resultTypeInfo, TiTableInfo tableInfo) {
TypeInformation<RowData> resultTypeInfo,
TiTableInfo tableInfo,
TiKVMetadataConverter[] metadataConverters) {
super(metadataConverters);
this.resultTypeInfo = resultTypeInfo;
this.tableInfo = tableInfo;
}
@ -57,10 +62,12 @@ public class RowDataTiKVSnapshotEventDeserializationSchema
public void deserialize(KvPair record, Collector<RowData> out) throws Exception {
final RowKey rowKey = RowKey.decode(record.getKey().toByteArray());
final long handle = rowKey.getHandle();
out.collect(
RowData rowData =
GenericRowData.ofKind(
RowKind.INSERT,
getRowDataFields(record.getValue().toByteArray(), handle, tableInfo)));
getRowDataFields(record.getValue().toByteArray(), handle, tableInfo));
emit(new TiKVMetadataConverter.TiKVRowValue(record), rowData, out);
}
private static Object[] getRowDataFields(byte[] value, Long handle, TiTableInfo tableInfo) {

@ -24,6 +24,7 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
@ -35,8 +36,12 @@ import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.meta.TiTableInfo;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -44,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A {@link DynamicTableSource} that describes how to create a TiDB binlog from a logical
* description.
*/
public class TiDBTableSource implements ScanTableSource {
public class TiDBTableSource implements ScanTableSource, SupportsReadingMetadata {
private final ResolvedSchema physicalSchema;
private final String hostname;
@ -63,6 +68,9 @@ public class TiDBTableSource implements ScanTableSource {
/** Data type that describes the final output of the source. */
protected DataType producedDataType;
/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
public TiDBTableSource(
ResolvedSchema physicalSchema,
String hostname,
@ -83,6 +91,7 @@ public class TiDBTableSource implements ScanTableSource {
this.startupOptions = startupOptions;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.options = options;
this.metadataKeys = Collections.emptyList();
}
@Override
@ -103,10 +112,13 @@ public class TiDBTableSource implements ScanTableSource {
final TiTableInfo tableInfo = session.getCatalog().getTable(database, tableName);
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
TiKVMetadataConverter[] metadataConverters = getMetadataConverters();
RowDataTiKVSnapshotEventDeserializationSchema snapshotEventDeserializationSchema =
new RowDataTiKVSnapshotEventDeserializationSchema(typeInfo, tableInfo);
new RowDataTiKVSnapshotEventDeserializationSchema(
typeInfo, tableInfo, metadataConverters);
RowDataTiKVChangeEventDeserializationSchema changeEventDeserializationSchema =
new RowDataTiKVChangeEventDeserializationSchema(typeInfo, tableInfo);
new RowDataTiKVChangeEventDeserializationSchema(
typeInfo, tableInfo, metadataConverters);
TiDBSource.Builder<RowData> builder =
TiDBSource.<RowData>builder()
@ -140,9 +152,28 @@ public class TiDBTableSource implements ScanTableSource {
startupOptions,
options);
source.producedDataType = producedDataType;
source.metadataKeys = metadataKeys;
return source;
}
private TiKVMetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new TiKVMetadataConverter[0];
}
return metadataKeys.stream()
.map(
key ->
Stream.of(
TiKVReadableMetadata.createTiKVReadableMetadata(
database, tableName))
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(TiKVReadableMetadata::getConverter)
.toArray(TiKVMetadataConverter[]::new);
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -181,4 +212,18 @@ public class TiDBTableSource implements ScanTableSource {
public String asSummaryString() {
return "TiDB-CDC";
}
@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(TiKVReadableMetadata.createTiKVReadableMetadata(database, tableName))
.collect(
Collectors.toMap(
TiKVReadableMetadata::getKey, TiKVReadableMetadata::getDataType));
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
}

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.util.Collector;
import java.io.Serializable;
/** Emits a row with physical fields and metadata fields. */
public class TiKVAppendMetadataCollector implements Collector<RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final TiKVMetadataConverter[] metadataConverters;
public transient TiKVMetadataConverter.TiKVRowValue row;
public transient Collector<RowData> outputCollector;
public TiKVAppendMetadataCollector(TiKVMetadataConverter[] metadataConverters) {
this.metadataConverters = metadataConverters;
}
@Override
public void collect(RowData physicalRow) {
GenericRowData metaRow = new GenericRowData(metadataConverters.length);
for (int i = 0; i < metadataConverters.length; i++) {
Object meta = metadataConverters[i].read(row);
metaRow.setField(i, meta);
}
RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
outputCollector.collect(outRow);
}
@Override
public void close() {
// nothing to do
}
}

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.annotation.Internal;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Kvrpcpb;
import java.io.Serializable;
/** A converter converts TiKV Row metadata into Flink internal data structures. */
@FunctionalInterface
@Internal
public interface TiKVMetadataConverter extends Serializable {
Object read(TiKVRowValue row);
/** TiKV Row Value. */
class TiKVRowValue {
public boolean isKv;
public Kvrpcpb.KvPair kvPair;
public Cdcpb.Event.Row row;
public TiKVRowValue(Kvrpcpb.KvPair kvPair) {
this.isKv = true;
this.kvPair = kvPair;
}
public TiKVRowValue(Cdcpb.Event.Row row) {
this.isKv = false;
this.row = row;
}
}
}

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import java.util.ArrayList;
import java.util.List;
/** Defines the supported metadata columns for {@link TiDBTableSource}. */
public class TiKVReadableMetadata {
private final String key;
private final DataType dataType;
private final TiKVMetadataConverter converter;
TiKVReadableMetadata(String key, DataType dataType, TiKVMetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}
public String getKey() {
return key;
}
public DataType getDataType() {
return dataType;
}
public TiKVMetadataConverter getConverter() {
return converter;
}
/** Name of the table that contain the row. */
public static TiKVReadableMetadata createTableNameMetadata(String tableName) {
return new TiKVReadableMetadata(
"table_name",
DataTypes.STRING().notNull(),
new TiKVMetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(TiKVRowValue row) {
return StringData.fromString(tableName);
}
});
}
/** Name of the database that contain the row. */
public static TiKVReadableMetadata createDatabaseNameMetadata(String database) {
return new TiKVReadableMetadata(
"database_name",
DataTypes.STRING().notNull(),
new TiKVMetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(TiKVRowValue row) {
return StringData.fromString(database);
}
});
}
public static TiKVReadableMetadata createOpTsMetadata() {
return new TiKVReadableMetadata(
"op_ts",
DataTypes.TIMESTAMP_LTZ(3).notNull(),
new TiKVMetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(TiKVRowValue row) {
if (row.isKv) {
// We cannot get ts from KvPair, use default value.
return TimestampData.fromEpochMillis(0);
} else {
return TimestampData.fromEpochMillis(row.row.getStartTs());
}
}
});
}
public static TiKVReadableMetadata[] createTiKVReadableMetadata(
String database, String tableName) {
List<TiKVReadableMetadata> list = new ArrayList<>();
list.add(createDatabaseNameMetadata(database));
list.add(createTableNameMetadata(tableName));
list.add(createOpTsMetadata());
return list.toArray(new TiKVReadableMetadata[0]);
}
}

@ -18,6 +18,7 @@
package com.ververica.cdc.connectors.tidb.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
@ -28,19 +29,30 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.connectors.tidb.TiDBTestBase;
import com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
/** Integration tests for TiDB table source factory. */
public class TiDBTableSourceFactoryTest {
public class TiDBTableSourceFactoryITCase extends TiDBTestBase {
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
@ -53,12 +65,26 @@ public class TiDBTableSourceFactoryTest {
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final String MY_HOSTNAME = "localhost:4000";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
private static final String MY_DATABASE = "myDB";
private static final String MY_TABLE = "myTable";
private static final String PD_ADDRESS = "pd:2379";
private static final ResolvedSchema SCHEMA_WITH_METADATA =
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.BIGINT().notNull()),
Column.physical("name", DataTypes.STRING()),
Column.physical("count", DataTypes.DECIMAL(38, 18)),
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
Column.metadata(
"database_name", DataTypes.STRING(), "database_name", true),
Column.metadata("table_name", DataTypes.STRING(), "table_name", true),
Column.metadata("op_ts", DataTypes.TIMESTAMP(), "op_ts", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
private static final String MY_HOSTNAME = "tidb0:4000";
private static final String MY_USERNAME = "root";
private static final String MY_PASSWORD = "";
private static final String MY_DATABASE = "inventory";
private static final String MY_TABLE = "products";
private static final String PD_ADDRESS = "pd0:2379";
private static final Map<String, String> OPTIONS = new HashMap<>();
@Test
@ -114,6 +140,41 @@ public class TiDBTableSourceFactoryTest {
assertEquals(expectedSource, actualSource);
}
@Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
TiDBTableSource tidbTableSource = (TiDBTableSource) actualSource;
tidbTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "table_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = tidbTableSource.copy();
TiDBTableSource expectedSource =
new TiDBTableSource(
SCHEMA_WITH_METADATA,
MY_HOSTNAME,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
PD_ADDRESS,
StartupOptions.latest(),
OPTIONS);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
tidbTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
TiKVRichParallelSourceFunction<RowData> sourceFunction =
(TiKVRichParallelSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(sourceFunction, expectedSource.producedDataType);
}
private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "tidb-cdc");
@ -140,11 +201,21 @@ public class TiDBTableSourceFactoryTest {
options),
schema),
new Configuration(),
TiDBTableSourceFactoryTest.class.getClassLoader(),
TiDBTableSourceFactoryITCase.class.getClassLoader(),
false);
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return createTableSource(SCHEMA, options);
}
private static void assertProducedTypeOfSourceFunction(
TiKVRichParallelSourceFunction<RowData> sourceFunction, DataType expectedProducedType) {
TypeInformation<RowData> producedType = sourceFunction.getProducedType();
assertThat(producedType, instanceOf(InternalTypeInfo.class));
InternalTypeInfo<RowData> rowDataInternalTypeInfo =
(InternalTypeInfo<RowData>) producedType;
DataType producedDataType = rowDataInternalTypeInfo.getDataType();
assertEquals(expectedProducedType.toString(), producedDataType.toString());
}
}
Loading…
Cancel
Save