[mysql] Support metadata columns for mysql-cdc connector (#496)

pull/497/head
Jark Wu 3 years ago committed by GitHub
parent 64fc3e34b5
commit 12f27acda2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.Serializable;
/** Emits a row with physical fields and metadata fields. */
@Internal
public final class AppendMetadataCollector implements Collector<RowData>, Serializable {
private static final long serialVersionUID = 1L;
private final MetadataConverter[] metadataConverters;
public transient SourceRecord inputRecord;
public transient Collector<RowData> outputCollector;
public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
this.metadataConverters = metadataConverters;
}
@Override
public void collect(RowData physicalRow) {
GenericRowData metaRow = new GenericRowData(metadataConverters.length);
for (int i = 0; i < metadataConverters.length; i++) {
Object meta = metadataConverters[i].read(inputRecord);
metaRow.setField(i, meta);
}
RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
outputCollector.collect(outRow);
}
@Override
public void close() {
// nothing to do
}
}

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium.table;
import org.apache.flink.annotation.Internal;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.Serializable;
/** A converter converts {@link SourceRecord} metadata into Flink internal data structures. */
@FunctionalInterface
@Internal
public interface MetadataConverter extends Serializable {
Object read(SourceRecord record);
}

@ -30,8 +30,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.data.Envelope;
@ -55,13 +53,15 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link
* RowData}.
*/
public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeserializationSchema<RowData> {
private static final long serialVersionUID = -4852684966051743776L;
private static final long serialVersionUID = 2L;
/** Custom validator to validate the row value. */
public interface ValueValidator extends Serializable {
@ -72,10 +72,18 @@ public final class RowDataDebeziumDeserializeSchema
private final TypeInformation<RowData> resultTypeInfo;
/**
* Runtime converter that converts {@link JsonNode}s into objects of Flink SQL internal data
* structures. *
* Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of
* physical column values.
*/
private final DeserializationRuntimeConverter runtimeConverter;
private final DeserializationRuntimeConverter physicalConverter;
/** Whether the deserializer needs to handle metadata columns. */
private final boolean hasMetadata;
/**
* A wrapped output collector which is used to append metadata columns after physical columns.
*/
private final AppendMetadataCollector appendMetadataCollector;
/** Time zone of the database server. */
private final ZoneId serverTimeZone;
@ -83,15 +91,23 @@ public final class RowDataDebeziumDeserializeSchema
/** Validator to validate the row value. */
private final ValueValidator validator;
public RowDataDebeziumDeserializeSchema(
RowType rowType,
/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
public static Builder newBuilder() {
return new Builder();
}
RowDataDebeziumDeserializeSchema(
RowType physicalDataType,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> resultTypeInfo,
ValueValidator validator,
ZoneId serverTimeZone) {
this.runtimeConverter = createConverter(rowType);
this.resultTypeInfo = resultTypeInfo;
this.validator = validator;
this.serverTimeZone = serverTimeZone;
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
this.physicalConverter = createConverter(checkNotNull(physicalDataType));
this.resultTypeInfo = checkNotNull(resultTypeInfo);
this.validator = checkNotNull(validator);
this.serverTimeZone = checkNotNull(serverTimeZone);
}
@Override
@ -103,35 +119,46 @@ public final class RowDataDebeziumDeserializeSchema
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
emit(record, insert, out);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
emit(record, delete, out);
} else {
GenericRowData before = extractBeforeRow(value, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(before);
emit(record, before, out);
GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(after);
emit(record, after, out);
}
}
private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception {
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
return (GenericRowData) runtimeConverter.convert(after, afterSchema);
return (GenericRowData) physicalConverter.convert(after, afterSchema);
}
private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws Exception {
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
return (GenericRowData) runtimeConverter.convert(before, beforeSchema);
return (GenericRowData) physicalConverter.convert(before, beforeSchema);
}
private void emit(SourceRecord inRecord, RowData physicalRow, Collector<RowData> collector) {
if (!hasMetadata) {
collector.collect(physicalRow);
return;
}
appendMetadataCollector.inputRecord = inRecord;
appendMetadataCollector.outputCollector = collector;
appendMetadataCollector.collect(physicalRow);
}
@Override
@ -139,6 +166,49 @@ public final class RowDataDebeziumDeserializeSchema
return resultTypeInfo;
}
// -------------------------------------------------------------------------------------
// Builder
// -------------------------------------------------------------------------------------
/** Builder of {@link RowDataDebeziumDeserializeSchema}. */
public static class Builder {
private RowType physicalRowType;
private TypeInformation<RowData> resultTypeInfo;
private MetadataConverter[] metadataConverters = new MetadataConverter[0];
private ValueValidator validator = (rowData, rowKind) -> {};
private ZoneId serverTimeZone = ZoneId.of("UTC");
public Builder setPhysicalRowType(RowType physicalRowType) {
this.physicalRowType = physicalRowType;
return this;
}
public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
this.metadataConverters = metadataConverters;
return this;
}
public Builder setResultTypeInfo(TypeInformation<RowData> resultTypeInfo) {
this.resultTypeInfo = resultTypeInfo;
return this;
}
public Builder setValueValidator(ValueValidator validator) {
this.validator = validator;
return this;
}
public Builder setServerTimeZone(ZoneId serverTimeZone) {
this.serverTimeZone = serverTimeZone;
return this;
}
public RowDataDebeziumDeserializeSchema build() {
return new RowDataDebeziumDeserializeSchema(
physicalRowType, metadataConverters, resultTypeInfo, validator, serverTimeZone);
}
}
// -------------------------------------------------------------------------------------
// Runtime Converters
// -------------------------------------------------------------------------------------

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.debezium.table.MetadataConverter;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/** Defines the supported metadata columns for {@link MySqlTableSource}. */
public enum MySqlReadableMetadata {
/** Name of the table that contain the row. . */
TABLE_NAME(
"table_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
}
}),
/** Name of the database that contain the row. */
DATABASE_NAME(
"database_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
}
}),
/**
* It indicates the time that the change was made in the database. If the record is read from
* snapshot of the table instead of the binlog, the value is always 0.
*/
OP_TS(
"op_ts",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
});
private final String key;
private final DataType dataType;
private final MetadataConverter converter;
MySqlReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}
public String getKey() {
return key;
}
public DataType getDataType() {
return dataType;
}
public MetadataConverter getConverter() {
return converter;
}
}

@ -26,7 +26,9 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
@ -36,17 +38,22 @@ import com.ververica.cdc.connectors.mysql.source.MySqlParallelSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME;
import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
@ -58,7 +65,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A {@link DynamicTableSource} that describes how to create a MySQL binlog source from a logical
* description.
*/
public class MySqlTableSource implements ScanTableSource {
public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadata {
private final TableSchema physicalSchema;
private final int port;
@ -76,6 +83,16 @@ public class MySqlTableSource implements ScanTableSource {
private final Duration connectTimeout;
private final StartupOptions startupOptions;
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
/** Data type that describes the final output of the source. */
protected DataType producedDataType;
/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
public MySqlTableSource(
TableSchema physicalSchema,
int port,
@ -107,6 +124,9 @@ public class MySqlTableSource implements ScanTableSource {
this.fetchSize = fetchSize;
this.connectTimeout = connectTimeout;
this.startupOptions = startupOptions;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
}
@Override
@ -121,12 +141,19 @@ public class MySqlTableSource implements ScanTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
final TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
new RowDataDebeziumDeserializeSchema(
rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setMetadataConverters(metadataConverters)
.setResultTypeInfo(typeInfo)
.setServerTimeZone(serverTimeZone)
.build();
if (enableParallelRead) {
Configuration configuration = getParallelSourceConf();
MySqlParallelSource<RowData> parallelSource =
@ -153,6 +180,22 @@ public class MySqlTableSource implements ScanTableSource {
}
}
protected MetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new MetadataConverter[0];
}
return metadataKeys.stream()
.map(
key ->
Stream.of(MySqlReadableMetadata.values())
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(MySqlReadableMetadata::getConverter)
.toArray(MetadataConverter[]::new);
}
private Configuration getParallelSourceConf() {
Map<String, String> properties = new HashMap<>();
if (dbzProperties != null) {
@ -204,24 +247,42 @@ public class MySqlTableSource implements ScanTableSource {
return Configuration.fromMap(properties);
}
@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(MySqlReadableMetadata.values())
.collect(
Collectors.toMap(
MySqlReadableMetadata::getKey, MySqlReadableMetadata::getDataType));
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
@Override
public DynamicTableSource copy() {
return new MySqlTableSource(
physicalSchema,
port,
hostname,
database,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
serverId,
enableParallelRead,
splitSize,
fetchSize,
connectTimeout,
startupOptions);
MySqlTableSource source =
new MySqlTableSource(
physicalSchema,
port,
hostname,
database,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
serverId,
enableParallelRead,
splitSize,
fetchSize,
connectTimeout,
startupOptions);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
}
@Override
@ -247,7 +308,9 @@ public class MySqlTableSource implements ScanTableSource {
&& Objects.equals(serverTimeZone, that.serverTimeZone)
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(connectTimeout, that.connectTimeout)
&& Objects.equals(startupOptions, that.startupOptions);
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys);
}
@Override
@ -267,7 +330,9 @@ public class MySqlTableSource implements ScanTableSource {
splitSize,
fetchSize,
connectTimeout,
startupOptions);
startupOptions,
producedDataType,
metadataKeys);
}
@Override

@ -39,12 +39,16 @@ import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mysql.MySqlSourceTest.currentMySqlLatestOffset;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.junit.Assert.assertEquals;
/** Integration tests for MySQL binlog SQL source. */
@RunWith(Parameterized.class)
@ -479,6 +483,106 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testMetadataColumns() throws Exception {
userDatabase1.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE mysql_users ("
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " email STRING,"
+ " age INT,"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.internal.implementation' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
userDatabase1.getUsername(),
userDatabase1.getPassword(),
userDatabase1.getDatabaseName(),
"user_table_.*",
getDezImplementation(),
incrementalSnapshot,
getServerId(),
getSplitSize());
String sinkDDL =
"CREATE TABLE sink ("
+ " database_name STRING,"
+ " table_name STRING,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " email STRING,"
+ " age INT,"
+ " primary key (database_name, table_name, id) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM mysql_users");
// wait for snapshot finished and begin binlog
waitForSinkSize("sink", 2);
try (Connection connection = userDatabase1.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO user_table_1_2 VALUES (200,'user_200','Wuhan',123567891234);");
statement.execute(
"INSERT INTO user_table_1_1 VALUES (300,'user_300','Hangzhou',123567891234, 'user_300@foo.com');");
statement.execute("UPDATE user_table_1_1 SET address='Beijing' WHERE id=300;");
statement.execute("UPDATE user_table_1_2 SET phone_number=88888888 WHERE id=121;");
statement.execute("DELETE FROM user_table_1_1 WHERE id=111;");
}
// waiting for binlog finished (5 more events)
waitForSinkSize("sink", 7);
List<String> expected =
Stream.of(
"+I[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
"+I[%s, user_table_1_2, 121, user_121, Shanghai, 123567891234, null, null]",
"+I[%s, user_table_1_2, 200, user_200, Wuhan, 123567891234, null, null]",
"+I[%s, user_table_1_1, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]",
"+U[%s, user_table_1_1, 300, user_300, Beijing, 123567891234, user_300@foo.com, null]",
"+U[%s, user_table_1_2, 121, user_121, Shanghai, 88888888, null, null]",
"-D[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]")
.map(s -> String.format(s, userDatabase1.getDatabaseName()))
.sorted()
.collect(Collectors.toList());
// TODO: we can't assert merged result for incremental-snapshot, because we can't add a
// keyby shuffle before "values" upsert sink. We should assert merged result once
// https://issues.apache.org/jira/browse/FLINK-24511 is fixed.
List<String> actual = TestValuesTableFactory.getRawResults("sink");
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromLatestOffset() throws Exception {
inventoryDatabase.createAndInitialize();
@ -622,7 +726,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
}
@Test
public void testInconsistentSchema() throws Exception {
public void testShardingTablesWithInconsistentSchema() throws Exception {
userDatabase1.createAndInitialize();
userDatabase2.createAndInitialize();
String sourceDDL =
@ -652,7 +756,9 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
MYSQL_CONTAINER.getDatabasePort(),
userDatabase1.getUsername(),
userDatabase1.getPassword(),
"user_.*",
String.format(
"(%s|%s)",
userDatabase1.getDatabaseName(), userDatabase2.getDatabaseName()),
"user_table_.*",
getDezImplementation(),
incrementalSnapshot,
@ -691,22 +797,9 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
result.getJobClient().get().cancel().get();
// should drop the userDatabase1 and userDatabase2 for the test will run
// three times and create multiply databases with name like user_xxx.
// otherwise it'll read the database created by previous tests for we use `user_.*` to match
// database
try (Connection connection = userDatabase1.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("drop database " + userDatabase1.getDatabaseName());
}
try (Connection connection = userDatabase2.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("drop database " + userDatabase2.getDatabaseName());
}
}
@Ignore
@Ignore("https://github.com/ververica/flink-cdc-connectors/issues/254")
@Test
public void testStartupFromSpecificOffset() throws Exception {
if (incrementalSnapshot) {
@ -796,7 +889,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
result.getJobClient().get().cancel().get();
}
@Ignore
@Ignore("https://github.com/ververica/flink-cdc-connectors/issues/254")
@Test
public void testStartupFromEarliestOffset() throws Exception {
if (incrementalSnapshot) {
@ -881,8 +974,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
result.getJobClient().get().cancel().get();
}
@Ignore("https://github.com/ververica/flink-cdc-connectors/issues/254")
@Test
@Ignore
public void testStartupFromTimestamp() throws Exception {
if (incrementalSnapshot) {
// not support yet

@ -39,6 +39,7 @@ import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -66,6 +67,18 @@ public class MySqlTableSourceFactoryTest {
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final ResolvedSchema SCHEMA_WITH_METADATA =
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.BIGINT().notNull()),
Column.physical("name", DataTypes.STRING()),
Column.physical("count", DataTypes.DECIMAL(38, 18)),
Column.metadata("time", DataTypes.TIMESTAMP(3), "op_ts", true),
Column.metadata(
"_database_name", DataTypes.STRING(), "database_name", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
@ -332,6 +345,42 @@ public class MySqlTableSourceFactoryTest {
assertEquals(expectedSource, actualSource);
}
@Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
MySqlTableSource mySqlSource = (MySqlTableSource) actualSource;
mySqlSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = mySqlSource.copy();
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(
fromResolvedSchema(SCHEMA_WITH_METADATA)),
3306,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
null,
false,
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
CONNECT_TIMEOUT.defaultValue(),
StartupOptions.initial());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
assertEquals(expectedSource, actualSource);
}
@Test
public void testValidation() {
// validate illegal port
@ -422,19 +471,24 @@ public class MySqlTableSourceFactoryTest {
return options;
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
private static DynamicTableSource createTableSource(
ResolvedSchema schema, Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(),
fromResolvedSchema(schema).toSchema(),
"mock source",
new ArrayList<>(),
options),
SCHEMA),
schema),
new Configuration(),
MySqlTableSourceFactoryTest.class.getClassLoader(),
false);
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return createTableSource(SCHEMA, options);
}
}

@ -61,9 +61,9 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestB
/** Integration tests to check mysql-cdc works well under different MySQL server timezone. */
@RunWith(Parameterized.class)
public class MysqlTimezoneITCase {
public class MySqlTimezoneITCase {
private static final Logger LOG = LoggerFactory.getLogger(MysqlTimezoneITCase.class);
private static final Logger LOG = LoggerFactory.getLogger(MySqlTimezoneITCase.class);
private static TemporaryFolder tempFolder;
private static File resourceFolder;
private final StreamExecutionEnvironment env =
@ -101,13 +101,13 @@ public class MysqlTimezoneITCase {
}
@Test
public void testMysqlServerInBerlin() throws Exception {
testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai");
public void testMySqlServerInBerlin() throws Exception {
testTemporalTypesWithMySqlServerTimezone("Europe/Berlin");
}
@Test
public void testMysqlServerInShanghai() throws Exception {
testTemporalTypesWithMySqlServerTimezone("Europe/Berlin");
public void testMySqlServerInShanghai() throws Exception {
testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai");
}
private void testTemporalTypesWithMySqlServerTimezone(String timezone) throws Exception {

@ -60,11 +60,10 @@ public class RecordsFormatter {
this.typeInfo =
(TypeInformation<RowData>) TypeConversions.fromDataTypeToLegacyInfo(dataType);
this.deserializationSchema =
new RowDataDebeziumDeserializeSchema(
(RowType) dataType.getLogicalType(),
typeInfo,
((rowData, rowKind) -> {}),
ZoneId.of("UTC"));
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType((RowType) dataType.getLogicalType())
.setResultTypeInfo(typeInfo)
.build();
this.collector = new SimpleCollector();
this.rowRowConverter = RowRowConverter.create(dataType);
rowRowConverter.open(Thread.currentThread().getContextClassLoader());

@ -33,7 +33,6 @@ import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import java.time.ZoneId;
import java.util.Objects;
import java.util.Properties;
@ -97,12 +96,13 @@ public class PostgreSQLTableSource implements ScanTableSource {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer =
new RowDataDebeziumDeserializeSchema(
rowType,
typeInfo,
new PostgresValueValidator(schemaName, tableName),
ZoneId.of("UTC"));
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(rowType)
.setResultTypeInfo(typeInfo)
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
.build();
DebeziumSourceFunction<RowData> sourceFunction =
PostgreSQLSource.<RowData>builder()
.hostname(hostname)

Loading…
Cancel
Save