diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
index 3443ad8d8..307abc138 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
@@ -119,6 +119,13 @@ pipeline:
String |
是否通过FE重定向写入,直连BE写入 |
+
+ charset-encoding |
+ optional |
+ false |
+ Boolean |
+ Doris Http客户端字符集编码,默认UTF-8 |
+
sink.enable.batch-mode |
optional |
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index 2ff209442..0de46fd49 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -298,6 +298,13 @@ pipeline:
这是一项实验性功能。
+
+ include-comments.enabled |
+ optional |
+ false |
+ Boolean |
+ 是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。 |
+
diff --git a/docs/content/docs/connectors/pipeline-connectors/doris.md b/docs/content/docs/connectors/pipeline-connectors/doris.md
index df740c1a9..cee412c16 100644
--- a/docs/content/docs/connectors/pipeline-connectors/doris.md
+++ b/docs/content/docs/connectors/pipeline-connectors/doris.md
@@ -119,6 +119,13 @@ pipeline:
String |
Whether to write through FE redirection and directly connect to BE to write |
+
+ charset-encoding |
+ optional |
+ false |
+ Boolean |
+ Charset encoding for doris http client, default UTF-8 |
+
sink.enable.batch-mode |
optional |
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 29c6549e5..df0faec4d 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -305,6 +305,14 @@ pipeline:
This is an experimental feature, and subject to change in the future.
+
+ include-comments.enabled |
+ optional |
+ false |
+ Boolean |
+ Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.
+ Note: Enable this option will bring the implications on memory usage. |
+
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
index e452ba547..61f7d36bc 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.StringUtils;
import javax.annotation.Nullable;
@@ -239,6 +240,9 @@ public class Schema implements Serializable {
if (!partitionKeys.isEmpty()) {
sb.append(", partitionKeys=").append(String.join(";", partitionKeys));
}
+ if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+ sb.append(", comment=").append(comment);
+ }
sb.append(", options=").append(describeOptions());
return sb.toString();
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
index 0cee7828a..3257debfe 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
@@ -28,6 +28,7 @@ limitations under the License.
24.0.1
+ 8.0.26
@@ -84,13 +85,13 @@ limitations under the License.
org.testcontainers
jdbc
- 1.18.3
+ ${testcontainers.version}
test
mysql
mysql-connector-java
- 8.0.26
+ ${mysql.connector.version}
test
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
index d0567ab03..4891309d4 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
@@ -39,6 +39,7 @@ import java.util.Set;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.AUTO_REDIRECT;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.JDBC_URL;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
@@ -143,6 +144,7 @@ public class DorisDataSinkFactory implements DataSinkFactory {
options.add(JDBC_URL);
options.add(PASSWORD);
options.add(AUTO_REDIRECT);
+ options.add(CHARSET_ENCODING);
options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
index 62613c059..5d58bbc82 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
@@ -61,6 +61,12 @@ public class DorisDataSinkOptions {
.withDescription(
"Use automatic redirection of fe without explicitly obtaining the be list");
+ public static final ConfigOption CHARSET_ENCODING =
+ ConfigOptions.key("charset-encoding")
+ .stringType()
+ .defaultValue("UTF-8")
+ .withDescription("Charset encoding for doris http client, default UTF-8.");
+
// Streaming Sink options
public static final ConfigOption SINK_ENABLE_2PC =
ConfigOptions.key("sink.enable-2pc")
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index ccf6d8798..1d6476152 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -64,6 +64,7 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUM
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
/** Supports {@link DorisDataSink} to schema evolution. */
@@ -76,7 +77,8 @@ public class DorisMetadataApplier implements MetadataApplier {
public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) {
this.dorisOptions = dorisOptions;
- this.schemaChangeManager = new DorisSchemaChangeManager(dorisOptions);
+ this.schemaChangeManager =
+ new DorisSchemaChangeManager(dorisOptions, config.get(CHARSET_ENCODING));
this.config = config;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
@@ -147,6 +149,7 @@ public class DorisMetadataApplier implements MetadataApplier {
tableSchema.setDatabase(tableId.getSchemaName());
tableSchema.setFields(buildFields(schema));
tableSchema.setDistributeKeys(buildDistributeKeys(schema));
+ tableSchema.setTableComment(schema.comment());
if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
tableSchema.setModel(DataModel.DUPLICATE);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java
index a4636f045..5358faf78 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java
@@ -27,8 +27,8 @@ import static org.apache.doris.flink.catalog.doris.DorisSystem.identifier;
/** An enriched version of Doris' {@link SchemaChangeManager}. */
public class DorisSchemaChangeManager extends SchemaChangeManager {
- public DorisSchemaChangeManager(DorisOptions dorisOptions) {
- super(dorisOptions);
+ public DorisSchemaChangeManager(DorisOptions dorisOptions, String charsetEncoding) {
+ super(dorisOptions, charsetEncoding);
}
public boolean truncateTable(String databaseName, String tableName)
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 408be27e9..41e60890e 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -37,9 +37,11 @@ import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +68,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
@@ -132,6 +135,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
+ boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
@@ -152,6 +156,13 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
Map configMap = config.toMap();
OptionUtils.printOptions(IDENTIFIER, config.toMap());
+ if (includeComments) {
+ // set debezium config 'include.schema.comments' to true
+ configMap.put(
+ DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ + RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
+ "true");
+ }
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
@@ -310,6 +321,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
options.add(METADATA_LIST);
+ options.add(INCLUDE_COMMENTS_ENABLED);
return options;
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
index ae676b37a..7b4ee5eab 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
@@ -31,6 +31,8 @@ import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEm
import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
+
import java.util.ArrayList;
import java.util.List;
@@ -57,11 +59,19 @@ public class MySqlDataSource implements DataSource {
@Override
public EventSourceProvider getEventSourceProvider() {
+ boolean includeComments =
+ sourceConfig
+ .getDbzConfiguration()
+ .getBoolean(
+ RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
+ false);
+
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
- readableMetadataList);
+ readableMetadataList,
+ includeComments);
MySqlSource source =
new MySqlSource<>(
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index aa682c211..374721244 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -289,4 +289,13 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false.");
+
+ @Experimental
+ public static final ConfigOption INCLUDE_COMMENTS_ENABLED =
+ ConfigOptions.key("include-comments.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether enable include table and column comments, by default is false, if set to true, table and column comments will be sent. "
+ + "Note: Enable this option will bring the implications on memory usage.");
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
index a50ac8729..004fc5e1a 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
@@ -62,6 +62,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final boolean includeSchemaChanges;
+ private final boolean includeComments;
private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customParser;
@@ -70,23 +71,25 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
- this(changelogMode, includeSchemaChanges, new ArrayList<>());
+ this(changelogMode, includeSchemaChanges, new ArrayList<>(), false);
}
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
- List readableMetadataList) {
+ List readableMetadataList,
+ boolean includeComments) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
this.readableMetadataList = readableMetadataList;
+ this.includeComments = includeComments;
}
@Override
protected List deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
if (customParser == null) {
- customParser = new CustomMySqlAntlrDdlParser();
+ customParser = new CustomMySqlAntlrDdlParser(includeComments);
tables = new Tables();
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
index 1264aa8d6..624d1ac41 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
@@ -23,6 +23,7 @@ import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
+import io.debezium.relational.Tables;
import java.sql.Types;
import java.util.ArrayList;
@@ -35,8 +36,8 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {
private final LinkedList parsedEvents;
- public CustomMySqlAntlrDdlParser() {
- super();
+ public CustomMySqlAntlrDdlParser(boolean includeComments) {
+ super(true, false, includeComments, null, Tables.TableFilter.includeAll());
this.parsedEvents = new LinkedList<>();
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 4f801e0fa..b5a6ec197 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -36,6 +36,7 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
@@ -211,6 +212,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter {
column.comment(),
column.defaultValueExpression().orElse(null));
}
+ tableBuilder.comment(table.comment());
List primaryKey = table.primaryKeyColumnNames();
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
@@ -229,7 +231,16 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter {
private synchronized MySqlAntlrDdlParser getParser() {
if (mySqlAntlrDdlParser == null) {
- mySqlAntlrDdlParser = new MySqlAntlrDdlParser();
+ boolean includeComments =
+ sourceConfig
+ .getDbzConfiguration()
+ .getBoolean(
+ RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS
+ .name(),
+ false);
+ mySqlAntlrDdlParser =
+ new MySqlAntlrDdlParser(
+ true, false, includeComments, null, Tables.TableFilter.includeAll());
}
return mySqlAntlrDdlParser;
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 06cfa5628..1252733f4 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -75,7 +75,14 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults;
@@ -931,6 +938,98 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
actual.stream().map(Object::toString).collect(Collectors.toList()));
}
+ @Test
+ public void testIncludeComments() throws Exception {
+ env.setParallelism(1);
+ inventoryDatabase.createAndInitialize();
+ TableId tableId =
+ TableId.tableId(inventoryDatabase.getDatabaseName(), "products_with_comments");
+
+ String createTableSql =
+ String.format(
+ "CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
+ + " id INTEGER NOT NULL AUTO_INCREMENT COMMENT 'column comment of id' PRIMARY KEY,\n"
+ + " name VARCHAR(255) NOT NULL DEFAULT 'flink' COMMENT 'column comment of name',\n"
+ + " weight FLOAT(6) COMMENT 'column comment of weight'\n"
+ + ")\n"
+ + "COMMENT 'table comment of products';",
+ inventoryDatabase.getDatabaseName(), "products_with_comments");
+ executeSql(inventoryDatabase, createTableSql);
+
+ Map options = new HashMap<>();
+ options.put(HOSTNAME.key(), MYSQL8_CONTAINER.getHost());
+ options.put(PORT.key(), String.valueOf(MYSQL8_CONTAINER.getDatabasePort()));
+ options.put(USERNAME.key(), TEST_USER);
+ options.put(PASSWORD.key(), TEST_PASSWORD);
+ options.put(SERVER_TIME_ZONE.key(), "UTC");
+ options.put(INCLUDE_COMMENTS_ENABLED.key(), "true");
+ options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".products_with_comments");
+ Factory.Context context =
+ new FactoryHelper.DefaultContext(
+ Configuration.fromMap(options), null, this.getClass().getClassLoader());
+
+ MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+ MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider) dataSource.getEventSourceProvider();
+
+ CloseableIterator events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ MySqlDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+ Thread.sleep(5_000);
+
+ // add some column
+ String addColumnSql =
+ String.format(
+ "ALTER TABLE `%s`.`products_with_comments` ADD COLUMN `description` VARCHAR(512) comment 'column comment of description';",
+ inventoryDatabase.getDatabaseName());
+ executeSql(inventoryDatabase, addColumnSql);
+
+ List expectedEvents = getEventsWithComments(tableId);
+ List actual = fetchResults(events, expectedEvents.size());
+ assertEqualsInAnyOrder(
+ expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
+ actual.stream().map(Object::toString).collect(Collectors.toList()));
+ }
+
+ private void executeSql(UniqueDatabase database, String sql) throws SQLException {
+ try (Connection connection = database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ }
+ }
+
+ private List getEventsWithComments(TableId tableId) {
+ return Arrays.asList(
+ new CreateTableEvent(
+ tableId,
+ Schema.newBuilder()
+ .physicalColumn(
+ "id", DataTypes.INT().notNull(), "column comment of id")
+ .physicalColumn(
+ "name",
+ DataTypes.VARCHAR(255).notNull(),
+ "column comment of name",
+ "flink")
+ .physicalColumn(
+ "weight", DataTypes.FLOAT(), "column comment of weight")
+ .primaryKey(Collections.singletonList("id"))
+ .comment("table comment of products")
+ .build()),
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "description",
+ DataTypes.VARCHAR(512),
+ "column comment of description")))));
+ }
+
private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index 74601ba13..1c79ae8f9 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -183,6 +183,7 @@ public class PaimonMetadataApplier implements MetadataApplier {
}
builder.partitionKeys(partitionKeys)
.primaryKey(primaryKeys)
+ .comment(schema.comment())
.options(tableOptions)
.options(schema.options());
catalog.createTable(tableIdToIdentifier(event), builder.build(), true);