[FLINK-34865][pipeline-connector/mysql] Support sync table and column comment

This closes #3482

Co-authored-by: Leonard Xu <xbjtdcq@gmail.com>
pull/3863/head
North Lin 2 weeks ago committed by GitHub
parent 50e6c82870
commit b5d9673258
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -119,6 +119,13 @@ pipeline:
<td>String</td>
<td> 是否通过FE重定向写入直连BE写入 </td>
</tr>
<tr>
<td>charset-encoding</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td> Doris Http客户端字符集编码默认UTF-8 </td>
</tr>
<tr>
<td>sink.enable.batch-mode</td>
<td>optional</td>

@ -298,6 +298,13 @@ pipeline:
这是一项实验性功能。
</td>
</tr>
<tr>
<td>include-comments.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。</td>
</tr>
</tbody>
</table>
</div>

@ -119,6 +119,13 @@ pipeline:
<td>String</td>
<td> Whether to write through FE redirection and directly connect to BE to write </td>
</tr>
<tr>
<td>charset-encoding</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td> Charset encoding for doris http client, default UTF-8 </td>
</tr>
<tr>
<td>sink.enable.batch-mode</td>
<td>optional</td>

@ -305,6 +305,14 @@ pipeline:
This is an experimental feature, and subject to change in the future.
</td>
</tr>
<tr>
<td>include-comments.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.<br>
Note: Enable this option will bring the implications on memory usage.</td>
</tr>
</tbody>
</table>
</div>

@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.StringUtils;
import javax.annotation.Nullable;
@ -239,6 +240,9 @@ public class Schema implements Serializable {
if (!partitionKeys.isEmpty()) {
sb.append(", partitionKeys=").append(String.join(";", partitionKeys));
}
if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
sb.append(", comment=").append(comment);
}
sb.append(", options=").append(describeOptions());
return sb.toString();

@ -28,6 +28,7 @@ limitations under the License.
<properties>
<doris.connector.version>24.0.1</doris.connector.version>
<mysql.connector.version>8.0.26</mysql.connector.version>
</properties>
<dependencies>
@ -84,13 +85,13 @@ limitations under the License.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>1.18.3</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<version>${mysql.connector.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

@ -39,6 +39,7 @@ import java.util.Set;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.AUTO_REDIRECT;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.JDBC_URL;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
@ -143,6 +144,7 @@ public class DorisDataSinkFactory implements DataSinkFactory {
options.add(JDBC_URL);
options.add(PASSWORD);
options.add(AUTO_REDIRECT);
options.add(CHARSET_ENCODING);
options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);

@ -61,6 +61,12 @@ public class DorisDataSinkOptions {
.withDescription(
"Use automatic redirection of fe without explicitly obtaining the be list");
public static final ConfigOption<String> CHARSET_ENCODING =
ConfigOptions.key("charset-encoding")
.stringType()
.defaultValue("UTF-8")
.withDescription("Charset encoding for doris http client, default UTF-8.");
// Streaming Sink options
public static final ConfigOption<Boolean> SINK_ENABLE_2PC =
ConfigOptions.key("sink.enable-2pc")

@ -64,6 +64,7 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUM
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
/** Supports {@link DorisDataSink} to schema evolution. */
@ -76,7 +77,8 @@ public class DorisMetadataApplier implements MetadataApplier {
public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) {
this.dorisOptions = dorisOptions;
this.schemaChangeManager = new DorisSchemaChangeManager(dorisOptions);
this.schemaChangeManager =
new DorisSchemaChangeManager(dorisOptions, config.get(CHARSET_ENCODING));
this.config = config;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
@ -147,6 +149,7 @@ public class DorisMetadataApplier implements MetadataApplier {
tableSchema.setDatabase(tableId.getSchemaName());
tableSchema.setFields(buildFields(schema));
tableSchema.setDistributeKeys(buildDistributeKeys(schema));
tableSchema.setTableComment(schema.comment());
if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
tableSchema.setModel(DataModel.DUPLICATE);

@ -27,8 +27,8 @@ import static org.apache.doris.flink.catalog.doris.DorisSystem.identifier;
/** An enriched version of Doris' {@link SchemaChangeManager}. */
public class DorisSchemaChangeManager extends SchemaChangeManager {
public DorisSchemaChangeManager(DorisOptions dorisOptions) {
super(dorisOptions);
public DorisSchemaChangeManager(DorisOptions dorisOptions, String charsetEncoding) {
super(dorisOptions, charsetEncoding);
}
public boolean truncateTable(String databaseName, String tableName)

@ -37,9 +37,11 @@ import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,6 +68,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
@ -132,6 +135,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
@ -152,6 +156,13 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
Map<String, String> configMap = config.toMap();
OptionUtils.printOptions(IDENTIFIER, config.toMap());
if (includeComments) {
// set debezium config 'include.schema.comments' to true
configMap.put(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
"true");
}
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
@ -310,6 +321,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
options.add(METADATA_LIST);
options.add(INCLUDE_COMMENTS_ENABLED);
return options;
}

@ -31,6 +31,8 @@ import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEm
import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import java.util.ArrayList;
import java.util.List;
@ -57,11 +59,19 @@ public class MySqlDataSource implements DataSource {
@Override
public EventSourceProvider getEventSourceProvider() {
boolean includeComments =
sourceConfig
.getDbzConfiguration()
.getBoolean(
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
false);
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
readableMetadataList);
readableMetadataList,
includeComments);
MySqlSource<Event> source =
new MySqlSource<>(

@ -289,4 +289,13 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false.");
@Experimental
public static final ConfigOption<Boolean> INCLUDE_COMMENTS_ENABLED =
ConfigOptions.key("include-comments.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether enable include table and column comments, by default is false, if set to true, table and column comments will be sent. "
+ "Note: Enable this option will bring the implications on memory usage.");
}

@ -62,6 +62,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final boolean includeSchemaChanges;
private final boolean includeComments;
private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customParser;
@ -70,23 +71,25 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
this(changelogMode, includeSchemaChanges, new ArrayList<>());
this(changelogMode, includeSchemaChanges, new ArrayList<>(), false);
}
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
List<MySqlReadableMetadata> readableMetadataList) {
List<MySqlReadableMetadata> readableMetadataList,
boolean includeComments) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
this.readableMetadataList = readableMetadataList;
this.includeComments = includeComments;
}
@Override
protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
if (customParser == null) {
customParser = new CustomMySqlAntlrDdlParser();
customParser = new CustomMySqlAntlrDdlParser(includeComments);
tables = new Tables();
}

@ -23,6 +23,7 @@ import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.relational.Tables;
import java.sql.Types;
import java.util.ArrayList;
@ -35,8 +36,8 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {
private final LinkedList<SchemaChangeEvent> parsedEvents;
public CustomMySqlAntlrDdlParser() {
super();
public CustomMySqlAntlrDdlParser(boolean includeComments) {
super(true, false, includeComments, null, Tables.TableFilter.includeAll());
this.parsedEvents = new LinkedList<>();
}

@ -36,6 +36,7 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
@ -211,6 +212,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
column.comment(),
column.defaultValueExpression().orElse(null));
}
tableBuilder.comment(table.comment());
List<String> primaryKey = table.primaryKeyColumnNames();
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
@ -229,7 +231,16 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
private synchronized MySqlAntlrDdlParser getParser() {
if (mySqlAntlrDdlParser == null) {
mySqlAntlrDdlParser = new MySqlAntlrDdlParser();
boolean includeComments =
sourceConfig
.getDbzConfiguration()
.getBoolean(
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS
.name(),
false);
mySqlAntlrDdlParser =
new MySqlAntlrDdlParser(
true, false, includeComments, null, Tables.TableFilter.includeAll());
}
return mySqlAntlrDdlParser;
}

@ -75,7 +75,14 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults;
@ -931,6 +938,98 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
actual.stream().map(Object::toString).collect(Collectors.toList()));
}
@Test
public void testIncludeComments() throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
TableId tableId =
TableId.tableId(inventoryDatabase.getDatabaseName(), "products_with_comments");
String createTableSql =
String.format(
"CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
+ " id INTEGER NOT NULL AUTO_INCREMENT COMMENT 'column comment of id' PRIMARY KEY,\n"
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink' COMMENT 'column comment of name',\n"
+ " weight FLOAT(6) COMMENT 'column comment of weight'\n"
+ ")\n"
+ "COMMENT 'table comment of products';",
inventoryDatabase.getDatabaseName(), "products_with_comments");
executeSql(inventoryDatabase, createTableSql);
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL8_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL8_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(SERVER_TIME_ZONE.key(), "UTC");
options.put(INCLUDE_COMMENTS_ENABLED.key(), "true");
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".products_with_comments");
Factory.Context context =
new FactoryHelper.DefaultContext(
Configuration.fromMap(options), null, this.getClass().getClassLoader());
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) dataSource.getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
Thread.sleep(5_000);
// add some column
String addColumnSql =
String.format(
"ALTER TABLE `%s`.`products_with_comments` ADD COLUMN `description` VARCHAR(512) comment 'column comment of description';",
inventoryDatabase.getDatabaseName());
executeSql(inventoryDatabase, addColumnSql);
List<Event> expectedEvents = getEventsWithComments(tableId);
List<Event> actual = fetchResults(events, expectedEvents.size());
assertEqualsInAnyOrder(
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
actual.stream().map(Object::toString).collect(Collectors.toList()));
}
private void executeSql(UniqueDatabase database, String sql) throws SQLException {
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
}
}
private List<Event> getEventsWithComments(TableId tableId) {
return Arrays.asList(
new CreateTableEvent(
tableId,
Schema.newBuilder()
.physicalColumn(
"id", DataTypes.INT().notNull(), "column comment of id")
.physicalColumn(
"name",
DataTypes.VARCHAR(255).notNull(),
"column comment of name",
"flink")
.physicalColumn(
"weight", DataTypes.FLOAT(), "column comment of weight")
.primaryKey(Collections.singletonList("id"))
.comment("table comment of products")
.build()),
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"description",
DataTypes.VARCHAR(512),
"column comment of description")))));
}
private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,

@ -183,6 +183,7 @@ public class PaimonMetadataApplier implements MetadataApplier {
}
builder.partitionKeys(partitionKeys)
.primaryKey(primaryKeys)
.comment(schema.comment())
.options(tableOptions)
.options(schema.options());
catalog.createTable(tableIdToIdentifier(event), builder.build(), true);

Loading…
Cancel
Save