From 12f27acda20bc1a747d566d9661d7c859affb469 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Tue, 12 Oct 2021 15:09:29 +0800 Subject: [PATCH] [mysql] Support metadata columns for mysql-cdc connector (#496) --- .../table/AppendMetadataCollector.java | 60 ++++++++ .../cdc/debezium/table/MetadataConverter.java | 32 +++++ .../RowDataDebeziumDeserializeSchema.java | 106 +++++++++++--- .../mysql/table/MySqlReadableMetadata.java | 108 +++++++++++++++ .../mysql/table/MySqlTableSource.java | 113 +++++++++++---- .../mysql/table/MySqlConnectorITCase.java | 129 +++++++++++++++--- .../table/MySqlTableSourceFactoryTest.java | 60 +++++++- ...neITCase.java => MySqlTimezoneITCase.java} | 12 +- .../mysql/testutils/RecordsFormatter.java | 9 +- .../postgres/table/PostgreSQLTableSource.java | 12 +- 10 files changed, 561 insertions(+), 80 deletions(-) create mode 100644 flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/AppendMetadataCollector.java create mode 100644 flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/MetadataConverter.java create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java rename flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/{MysqlTimezoneITCase.java => MySqlTimezoneITCase.java} (98%) diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/AppendMetadataCollector.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/AppendMetadataCollector.java new file mode 100644 index 000000000..375365f65 --- /dev/null +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/AppendMetadataCollector.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.debezium.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.util.Collector; + +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.Serializable; + +/** Emits a row with physical fields and metadata fields. */ +@Internal +public final class AppendMetadataCollector implements Collector, Serializable { + private static final long serialVersionUID = 1L; + + private final MetadataConverter[] metadataConverters; + + public transient SourceRecord inputRecord; + public transient Collector outputCollector; + + public AppendMetadataCollector(MetadataConverter[] metadataConverters) { + this.metadataConverters = metadataConverters; + } + + @Override + public void collect(RowData physicalRow) { + GenericRowData metaRow = new GenericRowData(metadataConverters.length); + for (int i = 0; i < metadataConverters.length; i++) { + Object meta = metadataConverters[i].read(inputRecord); + metaRow.setField(i, meta); + } + RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow); + outputCollector.collect(outRow); + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/MetadataConverter.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/MetadataConverter.java new file mode 100644 index 000000000..d9fe772dd --- /dev/null +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/MetadataConverter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.debezium.table; + +import org.apache.flink.annotation.Internal; + +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.Serializable; + +/** A converter converts {@link SourceRecord} metadata into Flink internal data structures. */ +@FunctionalInterface +@Internal +public interface MetadataConverter extends Serializable { + Object read(SourceRecord record); +} diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java index 9349cd17d..211b6640a 100644 --- a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java @@ -30,8 +30,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; - import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.utils.TemporalConversions; import io.debezium.data.Envelope; @@ -55,13 +53,15 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link * RowData}. */ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeserializationSchema { - private static final long serialVersionUID = -4852684966051743776L; + private static final long serialVersionUID = 2L; /** Custom validator to validate the row value. */ public interface ValueValidator extends Serializable { @@ -72,10 +72,18 @@ public final class RowDataDebeziumDeserializeSchema private final TypeInformation resultTypeInfo; /** - * Runtime converter that converts {@link JsonNode}s into objects of Flink SQL internal data - * structures. * + * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of + * physical column values. */ - private final DeserializationRuntimeConverter runtimeConverter; + private final DeserializationRuntimeConverter physicalConverter; + + /** Whether the deserializer needs to handle metadata columns. */ + private final boolean hasMetadata; + + /** + * A wrapped output collector which is used to append metadata columns after physical columns. + */ + private final AppendMetadataCollector appendMetadataCollector; /** Time zone of the database server. */ private final ZoneId serverTimeZone; @@ -83,15 +91,23 @@ public final class RowDataDebeziumDeserializeSchema /** Validator to validate the row value. */ private final ValueValidator validator; - public RowDataDebeziumDeserializeSchema( - RowType rowType, + /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ + public static Builder newBuilder() { + return new Builder(); + } + + RowDataDebeziumDeserializeSchema( + RowType physicalDataType, + MetadataConverter[] metadataConverters, TypeInformation resultTypeInfo, ValueValidator validator, ZoneId serverTimeZone) { - this.runtimeConverter = createConverter(rowType); - this.resultTypeInfo = resultTypeInfo; - this.validator = validator; - this.serverTimeZone = serverTimeZone; + this.hasMetadata = checkNotNull(metadataConverters).length > 0; + this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); + this.physicalConverter = createConverter(checkNotNull(physicalDataType)); + this.resultTypeInfo = checkNotNull(resultTypeInfo); + this.validator = checkNotNull(validator); + this.serverTimeZone = checkNotNull(serverTimeZone); } @Override @@ -103,35 +119,46 @@ public final class RowDataDebeziumDeserializeSchema GenericRowData insert = extractAfterRow(value, valueSchema); validator.validate(insert, RowKind.INSERT); insert.setRowKind(RowKind.INSERT); - out.collect(insert); + emit(record, insert, out); } else if (op == Envelope.Operation.DELETE) { GenericRowData delete = extractBeforeRow(value, valueSchema); validator.validate(delete, RowKind.DELETE); delete.setRowKind(RowKind.DELETE); - out.collect(delete); + emit(record, delete, out); } else { GenericRowData before = extractBeforeRow(value, valueSchema); validator.validate(before, RowKind.UPDATE_BEFORE); before.setRowKind(RowKind.UPDATE_BEFORE); - out.collect(before); + emit(record, before, out); GenericRowData after = extractAfterRow(value, valueSchema); validator.validate(after, RowKind.UPDATE_AFTER); after.setRowKind(RowKind.UPDATE_AFTER); - out.collect(after); + emit(record, after, out); } } private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception { Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema(); Struct after = value.getStruct(Envelope.FieldName.AFTER); - return (GenericRowData) runtimeConverter.convert(after, afterSchema); + return (GenericRowData) physicalConverter.convert(after, afterSchema); } private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws Exception { Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema(); Struct before = value.getStruct(Envelope.FieldName.BEFORE); - return (GenericRowData) runtimeConverter.convert(before, beforeSchema); + return (GenericRowData) physicalConverter.convert(before, beforeSchema); + } + + private void emit(SourceRecord inRecord, RowData physicalRow, Collector collector) { + if (!hasMetadata) { + collector.collect(physicalRow); + return; + } + + appendMetadataCollector.inputRecord = inRecord; + appendMetadataCollector.outputCollector = collector; + appendMetadataCollector.collect(physicalRow); } @Override @@ -139,6 +166,49 @@ public final class RowDataDebeziumDeserializeSchema return resultTypeInfo; } + // ------------------------------------------------------------------------------------- + // Builder + // ------------------------------------------------------------------------------------- + + /** Builder of {@link RowDataDebeziumDeserializeSchema}. */ + public static class Builder { + private RowType physicalRowType; + private TypeInformation resultTypeInfo; + private MetadataConverter[] metadataConverters = new MetadataConverter[0]; + private ValueValidator validator = (rowData, rowKind) -> {}; + private ZoneId serverTimeZone = ZoneId.of("UTC"); + + public Builder setPhysicalRowType(RowType physicalRowType) { + this.physicalRowType = physicalRowType; + return this; + } + + public Builder setMetadataConverters(MetadataConverter[] metadataConverters) { + this.metadataConverters = metadataConverters; + return this; + } + + public Builder setResultTypeInfo(TypeInformation resultTypeInfo) { + this.resultTypeInfo = resultTypeInfo; + return this; + } + + public Builder setValueValidator(ValueValidator validator) { + this.validator = validator; + return this; + } + + public Builder setServerTimeZone(ZoneId serverTimeZone) { + this.serverTimeZone = serverTimeZone; + return this; + } + + public RowDataDebeziumDeserializeSchema build() { + return new RowDataDebeziumDeserializeSchema( + physicalRowType, metadataConverters, resultTypeInfo, validator, serverTimeZone); + } + } + // ------------------------------------------------------------------------------------- // Runtime Converters // ------------------------------------------------------------------------------------- diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java new file mode 100644 index 000000000..a5b44c864 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; + +import com.ververica.cdc.debezium.table.MetadataConverter; +import io.debezium.connector.AbstractSourceInfo; +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +/** Defines the supported metadata columns for {@link MySqlTableSource}. */ +public enum MySqlReadableMetadata { + /** Name of the table that contain the row. . */ + TABLE_NAME( + "table_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString( + sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY)); + } + }), + + /** Name of the database that contain the row. */ + DATABASE_NAME( + "database_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString( + sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY)); + } + }), + + /** + * It indicates the time that the change was made in the database. If the record is read from + * snapshot of the table instead of the binlog, the value is always 0. + */ + OP_TS( + "op_ts", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return TimestampData.fromEpochMillis( + (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)); + } + }); + + private final String key; + + private final DataType dataType; + + private final MetadataConverter converter; + + MySqlReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + + public String getKey() { + return key; + } + + public DataType getDataType() { + return dataType; + } + + public MetadataConverter getConverter() { + return converter; + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java index c663685a4..11577200e 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java @@ -26,7 +26,9 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; @@ -36,17 +38,22 @@ import com.ververica.cdc.connectors.mysql.source.MySqlParallelSource; import com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.table.MetadataConverter; import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; import javax.annotation.Nullable; import java.time.Duration; import java.time.ZoneId; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME; import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; @@ -58,7 +65,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A {@link DynamicTableSource} that describes how to create a MySQL binlog source from a logical * description. */ -public class MySqlTableSource implements ScanTableSource { +public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadata { private final TableSchema physicalSchema; private final int port; @@ -76,6 +83,16 @@ public class MySqlTableSource implements ScanTableSource { private final Duration connectTimeout; private final StartupOptions startupOptions; + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + public MySqlTableSource( TableSchema physicalSchema, int port, @@ -107,6 +124,9 @@ public class MySqlTableSource implements ScanTableSource { this.fetchSize = fetchSize; this.connectTimeout = connectTimeout; this.startupOptions = startupOptions; + // Mutable attributes + this.producedDataType = physicalSchema.toPhysicalRowDataType(); + this.metadataKeys = Collections.emptyList(); } @Override @@ -121,12 +141,19 @@ public class MySqlTableSource implements ScanTableSource { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { - RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); - TypeInformation typeInfo = - scanContext.createTypeInformation(physicalSchema.toRowDataType()); + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + MetadataConverter[] metadataConverters = getMetadataConverters(); + final TypeInformation typeInfo = + scanContext.createTypeInformation(producedDataType); + DebeziumDeserializationSchema deserializer = - new RowDataDebeziumDeserializeSchema( - rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone); + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setMetadataConverters(metadataConverters) + .setResultTypeInfo(typeInfo) + .setServerTimeZone(serverTimeZone) + .build(); if (enableParallelRead) { Configuration configuration = getParallelSourceConf(); MySqlParallelSource parallelSource = @@ -153,6 +180,22 @@ public class MySqlTableSource implements ScanTableSource { } } + protected MetadataConverter[] getMetadataConverters() { + if (metadataKeys.isEmpty()) { + return new MetadataConverter[0]; + } + + return metadataKeys.stream() + .map( + key -> + Stream.of(MySqlReadableMetadata.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(MySqlReadableMetadata::getConverter) + .toArray(MetadataConverter[]::new); + } + private Configuration getParallelSourceConf() { Map properties = new HashMap<>(); if (dbzProperties != null) { @@ -204,24 +247,42 @@ public class MySqlTableSource implements ScanTableSource { return Configuration.fromMap(properties); } + @Override + public Map listReadableMetadata() { + return Stream.of(MySqlReadableMetadata.values()) + .collect( + Collectors.toMap( + MySqlReadableMetadata::getKey, MySqlReadableMetadata::getDataType)); + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + this.producedDataType = producedDataType; + } + @Override public DynamicTableSource copy() { - return new MySqlTableSource( - physicalSchema, - port, - hostname, - database, - tableName, - username, - password, - serverTimeZone, - dbzProperties, - serverId, - enableParallelRead, - splitSize, - fetchSize, - connectTimeout, - startupOptions); + MySqlTableSource source = + new MySqlTableSource( + physicalSchema, + port, + hostname, + database, + tableName, + username, + password, + serverTimeZone, + dbzProperties, + serverId, + enableParallelRead, + splitSize, + fetchSize, + connectTimeout, + startupOptions); + source.metadataKeys = metadataKeys; + source.producedDataType = producedDataType; + return source; } @Override @@ -247,7 +308,9 @@ public class MySqlTableSource implements ScanTableSource { && Objects.equals(serverTimeZone, that.serverTimeZone) && Objects.equals(dbzProperties, that.dbzProperties) && Objects.equals(connectTimeout, that.connectTimeout) - && Objects.equals(startupOptions, that.startupOptions); + && Objects.equals(startupOptions, that.startupOptions) + && Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(metadataKeys, that.metadataKeys); } @Override @@ -267,7 +330,9 @@ public class MySqlTableSource implements ScanTableSource { splitSize, fetchSize, connectTimeout, - startupOptions); + startupOptions, + producedDataType, + metadataKeys); } @Override diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index c56c79ecb..915423a93 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -39,12 +39,16 @@ import java.sql.Connection; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.ververica.cdc.connectors.mysql.MySqlSourceTest.currentMySqlLatestOffset; import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.junit.Assert.assertEquals; /** Integration tests for MySQL binlog SQL source. */ @RunWith(Parameterized.class) @@ -479,6 +483,106 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { result.getJobClient().get().cancel().get(); } + @Test + public void testMetadataColumns() throws Exception { + userDatabase1.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE mysql_users (" + + " db_name STRING METADATA FROM 'database_name' VIRTUAL," + + " table_name STRING METADATA VIRTUAL," + + " `id` DECIMAL(20, 0) NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " email STRING," + + " age INT," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'debezium.internal.implementation' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-id' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + userDatabase1.getUsername(), + userDatabase1.getPassword(), + userDatabase1.getDatabaseName(), + "user_table_.*", + getDezImplementation(), + incrementalSnapshot, + getServerId(), + getSplitSize()); + + String sinkDDL = + "CREATE TABLE sink (" + + " database_name STRING," + + " table_name STRING," + + " `id` DECIMAL(20, 0) NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " email STRING," + + " age INT," + + " primary key (database_name, table_name, id) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM mysql_users"); + + // wait for snapshot finished and begin binlog + waitForSinkSize("sink", 2); + + try (Connection connection = userDatabase1.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "INSERT INTO user_table_1_2 VALUES (200,'user_200','Wuhan',123567891234);"); + statement.execute( + "INSERT INTO user_table_1_1 VALUES (300,'user_300','Hangzhou',123567891234, 'user_300@foo.com');"); + statement.execute("UPDATE user_table_1_1 SET address='Beijing' WHERE id=300;"); + statement.execute("UPDATE user_table_1_2 SET phone_number=88888888 WHERE id=121;"); + statement.execute("DELETE FROM user_table_1_1 WHERE id=111;"); + } + + // waiting for binlog finished (5 more events) + waitForSinkSize("sink", 7); + + List expected = + Stream.of( + "+I[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]", + "+I[%s, user_table_1_2, 121, user_121, Shanghai, 123567891234, null, null]", + "+I[%s, user_table_1_2, 200, user_200, Wuhan, 123567891234, null, null]", + "+I[%s, user_table_1_1, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]", + "+U[%s, user_table_1_1, 300, user_300, Beijing, 123567891234, user_300@foo.com, null]", + "+U[%s, user_table_1_2, 121, user_121, Shanghai, 88888888, null, null]", + "-D[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]") + .map(s -> String.format(s, userDatabase1.getDatabaseName())) + .sorted() + .collect(Collectors.toList()); + + // TODO: we can't assert merged result for incremental-snapshot, because we can't add a + // keyby shuffle before "values" upsert sink. We should assert merged result once + // https://issues.apache.org/jira/browse/FLINK-24511 is fixed. + List actual = TestValuesTableFactory.getRawResults("sink"); + Collections.sort(actual); + assertEquals(expected, actual); + result.getJobClient().get().cancel().get(); + } + @Test public void testStartupFromLatestOffset() throws Exception { inventoryDatabase.createAndInitialize(); @@ -622,7 +726,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { } @Test - public void testInconsistentSchema() throws Exception { + public void testShardingTablesWithInconsistentSchema() throws Exception { userDatabase1.createAndInitialize(); userDatabase2.createAndInitialize(); String sourceDDL = @@ -652,7 +756,9 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { MYSQL_CONTAINER.getDatabasePort(), userDatabase1.getUsername(), userDatabase1.getPassword(), - "user_.*", + String.format( + "(%s|%s)", + userDatabase1.getDatabaseName(), userDatabase2.getDatabaseName()), "user_table_.*", getDezImplementation(), incrementalSnapshot, @@ -691,22 +797,9 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { assertEqualsInAnyOrder( Arrays.asList(expected), fetchRows(result.collect(), expected.length)); result.getJobClient().get().cancel().get(); - - // should drop the userDatabase1 and userDatabase2 for the test will run - // three times and create multiply databases with name like user_xxx. - // otherwise it'll read the database created by previous tests for we use `user_.*` to match - // database - try (Connection connection = userDatabase1.getJdbcConnection(); - Statement statement = connection.createStatement()) { - statement.execute("drop database " + userDatabase1.getDatabaseName()); - } - try (Connection connection = userDatabase2.getJdbcConnection(); - Statement statement = connection.createStatement()) { - statement.execute("drop database " + userDatabase2.getDatabaseName()); - } } - @Ignore + @Ignore("https://github.com/ververica/flink-cdc-connectors/issues/254") @Test public void testStartupFromSpecificOffset() throws Exception { if (incrementalSnapshot) { @@ -796,7 +889,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { result.getJobClient().get().cancel().get(); } - @Ignore + @Ignore("https://github.com/ververica/flink-cdc-connectors/issues/254") @Test public void testStartupFromEarliestOffset() throws Exception { if (incrementalSnapshot) { @@ -881,8 +974,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { result.getJobClient().get().cancel().get(); } + @Ignore("https://github.com/ververica/flink-cdc-connectors/issues/254") @Test - @Ignore public void testStartupFromTimestamp() throws Exception { if (incrementalSnapshot) { // not support yet diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index cf17aec9b..fc7bbdbbb 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -39,6 +39,7 @@ import java.time.Duration; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -66,6 +67,18 @@ public class MySqlTableSourceFactoryTest { new ArrayList<>(), UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa"))); + private static final ResolvedSchema SCHEMA_WITH_METADATA = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.BIGINT().notNull()), + Column.physical("name", DataTypes.STRING()), + Column.physical("count", DataTypes.DECIMAL(38, 18)), + Column.metadata("time", DataTypes.TIMESTAMP(3), "op_ts", true), + Column.metadata( + "_database_name", DataTypes.STRING(), "database_name", true)), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); + private static final String MY_LOCALHOST = "localhost"; private static final String MY_USERNAME = "flinkuser"; private static final String MY_PASSWORD = "flinkpw"; @@ -332,6 +345,42 @@ public class MySqlTableSourceFactoryTest { assertEquals(expectedSource, actualSource); } + @Test + public void testMetadataColumns() { + Map properties = getAllOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); + MySqlTableSource mySqlSource = (MySqlTableSource) actualSource; + mySqlSource.applyReadableMetadata( + Arrays.asList("op_ts", "database_name"), + SCHEMA_WITH_METADATA.toSourceRowDataType()); + actualSource = mySqlSource.copy(); + + MySqlTableSource expectedSource = + new MySqlTableSource( + TableSchemaUtils.getPhysicalSchema( + fromResolvedSchema(SCHEMA_WITH_METADATA)), + 3306, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + PROPERTIES, + null, + false, + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(), + SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(), + CONNECT_TIMEOUT.defaultValue(), + StartupOptions.initial()); + expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); + expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); + + assertEquals(expectedSource, actualSource); + } + @Test public void testValidation() { // validate illegal port @@ -422,19 +471,24 @@ public class MySqlTableSourceFactoryTest { return options; } - private static DynamicTableSource createTableSource(Map options) { + private static DynamicTableSource createTableSource( + ResolvedSchema schema, Map options) { return FactoryUtil.createTableSource( null, ObjectIdentifier.of("default", "default", "t1"), new ResolvedCatalogTable( CatalogTable.of( - fromResolvedSchema(SCHEMA).toSchema(), + fromResolvedSchema(schema).toSchema(), "mock source", new ArrayList<>(), options), - SCHEMA), + schema), new Configuration(), MySqlTableSourceFactoryTest.class.getClassLoader(), false); } + + private static DynamicTableSource createTableSource(Map options) { + return createTableSource(SCHEMA, options); + } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTimezoneITCase.java similarity index 98% rename from flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java rename to flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTimezoneITCase.java index 5b13d8af9..59116bbce 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTimezoneITCase.java @@ -61,9 +61,9 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestB /** Integration tests to check mysql-cdc works well under different MySQL server timezone. */ @RunWith(Parameterized.class) -public class MysqlTimezoneITCase { +public class MySqlTimezoneITCase { - private static final Logger LOG = LoggerFactory.getLogger(MysqlTimezoneITCase.class); + private static final Logger LOG = LoggerFactory.getLogger(MySqlTimezoneITCase.class); private static TemporaryFolder tempFolder; private static File resourceFolder; private final StreamExecutionEnvironment env = @@ -101,13 +101,13 @@ public class MysqlTimezoneITCase { } @Test - public void testMysqlServerInBerlin() throws Exception { - testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai"); + public void testMySqlServerInBerlin() throws Exception { + testTemporalTypesWithMySqlServerTimezone("Europe/Berlin"); } @Test - public void testMysqlServerInShanghai() throws Exception { - testTemporalTypesWithMySqlServerTimezone("Europe/Berlin"); + public void testMySqlServerInShanghai() throws Exception { + testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai"); } private void testTemporalTypesWithMySqlServerTimezone(String timezone) throws Exception { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java index c5c106062..1aa2209ac 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java @@ -60,11 +60,10 @@ public class RecordsFormatter { this.typeInfo = (TypeInformation) TypeConversions.fromDataTypeToLegacyInfo(dataType); this.deserializationSchema = - new RowDataDebeziumDeserializeSchema( - (RowType) dataType.getLogicalType(), - typeInfo, - ((rowData, rowKind) -> {}), - ZoneId.of("UTC")); + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType((RowType) dataType.getLogicalType()) + .setResultTypeInfo(typeInfo) + .build(); this.collector = new SimpleCollector(); this.rowRowConverter = RowRowConverter.create(dataType); rowRowConverter.open(Thread.currentThread().getContextClassLoader()); diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java index 049d19f31..dd1eb06b8 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java @@ -33,7 +33,6 @@ import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; -import java.time.ZoneId; import java.util.Objects; import java.util.Properties; @@ -97,12 +96,13 @@ public class PostgreSQLTableSource implements ScanTableSource { RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); TypeInformation typeInfo = scanContext.createTypeInformation(physicalSchema.toRowDataType()); + DebeziumDeserializationSchema deserializer = - new RowDataDebeziumDeserializeSchema( - rowType, - typeInfo, - new PostgresValueValidator(schemaName, tableName), - ZoneId.of("UTC")); + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType(rowType) + .setResultTypeInfo(typeInfo) + .setValueValidator(new PostgresValueValidator(schemaName, tableName)) + .build(); DebeziumSourceFunction sourceFunction = PostgreSQLSource.builder() .hostname(hostname)