diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java index 31fc5ba82..df69f6126 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java @@ -20,6 +20,7 @@ import com.ververica.cdc.common.annotation.PublicEvolving; import com.ververica.cdc.common.data.RecordData; import java.io.Serializable; +import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -82,7 +83,8 @@ public class DataChangeEvent implements ChangeEvent, Serializable { /** Creates a {@link DataChangeEvent} instance that describes the insert event. */ public static DataChangeEvent insertEvent(TableId tableId, RecordData after) { - return new DataChangeEvent(tableId, null, after, OperationType.INSERT, null); + return new DataChangeEvent( + tableId, null, after, OperationType.INSERT, Collections.emptyMap()); } /** @@ -95,7 +97,8 @@ public class DataChangeEvent implements ChangeEvent, Serializable { /** Creates a {@link DataChangeEvent} instance that describes the delete event. */ public static DataChangeEvent deleteEvent(TableId tableId, RecordData before) { - return new DataChangeEvent(tableId, before, null, OperationType.DELETE, null); + return new DataChangeEvent( + tableId, before, null, OperationType.DELETE, Collections.emptyMap()); } /** @@ -109,7 +112,8 @@ public class DataChangeEvent implements ChangeEvent, Serializable { /** Creates a {@link DataChangeEvent} instance that describes the update event. */ public static DataChangeEvent updateEvent( TableId tableId, RecordData before, RecordData after) { - return new DataChangeEvent(tableId, before, after, OperationType.UPDATE, null); + return new DataChangeEvent( + tableId, before, after, OperationType.UPDATE, Collections.emptyMap()); } /** @@ -122,7 +126,8 @@ public class DataChangeEvent implements ChangeEvent, Serializable { /** Creates a {@link DataChangeEvent} instance that describes the replace event. */ public static DataChangeEvent replaceEvent(TableId tableId, RecordData after) { - return new DataChangeEvent(tableId, null, after, OperationType.REPLACE, null); + return new DataChangeEvent( + tableId, null, after, OperationType.REPLACE, Collections.emptyMap()); } /** diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/EventDeserializer.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/EventDeserializer.java new file mode 100644 index 000000000..1673b6d31 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/EventDeserializer.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import com.ververica.cdc.common.annotation.Public; + +import java.io.Serializable; +import java.util.List; + +/** Deserializer to deserialize given record to {@link Event}. */ +@Public +public interface EventDeserializer extends Serializable { + + /** Deserialize given record to {@link Event}s. */ + List deserialize(T record) throws Exception; +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/MetadataAccessor.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/MetadataAccessor.java index 275255d76..cfc6488cc 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/MetadataAccessor.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/source/MetadataAccessor.java @@ -31,15 +31,39 @@ import java.util.List; @PublicEvolving public interface MetadataAccessor { - /** List all namespaces from external systems. */ - List listNamespaces() throws UnsupportedOperationException; + /** + * List all namespaces from external systems. + * + * @return The list of namespaces + * @throws UnsupportedOperationException Thrown, if the external system does not support + * namespace. + */ + List listNamespaces(); - /** List schemas by namespace from external systems. */ - List listSchemas(@Nullable String namespace) throws UnsupportedOperationException; + /** + * List all schemas from external systems. + * + * @param namespace The namespace to list schemas from. If null, list schemas from all + * namespaces. + * @return The list of schemas + * @throws UnsupportedOperationException Thrown, if the external system does not support schema. + */ + List listSchemas(@Nullable String namespace); - /** List tables by namespace and schema from external systems. */ + /** + * List tables by namespace and schema from external systems. + * + * @param namespace The namespace to list tables from. If null, list tables from all namespaces. + * @param schemaName The schema to list tables from. If null, list tables from all schemas. + * @return The list of {@link TableId}s. + */ List listTables(@Nullable String namespace, @Nullable String schemaName); - /** Get the schema of the given table. */ + /** + * Get the {@link Schema} of the given table. + * + * @param tableId The {@link TableId} of the given table. + * @return The {@link Schema} of the table. + */ Schema getTableSchema(TableId tableId); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml new file mode 100644 index 000000000..05613f108 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml @@ -0,0 +1,153 @@ + + + + + + com.ververica + flink-cdc-pipeline-connectors + ${revision} + + 4.0.0 + + flink-cdc-pipeline-connector-mysql + + + + + + + com.ververica + flink-connector-mysql-cdc + ${project.version} + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + com.ververica + flink-connector-mysql-cdc + ${project.version} + test + test-jar + + + org.testcontainers + mysql + ${testcontainers.version} + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + shade-flink + package + + shade + + + false + + + io.debezium:debezium-api + io.debezium:debezium-embedded + io.debezium:debezium-core + io.debezium:debezium-ddl-parser + io.debezium:debezium-connector-mysql + com.ververica:flink-connector-debezium + com.ververica:flink-connector-mysql-cdc + org.antlr:antlr4-runtime + org.apache.kafka:* + mysql:mysql-connector-java + com.zendesk:mysql-binlog-connector-java + com.fasterxml.*:* + com.google.guava:* + com.esri.geometry:esri-geometry-api + com.zaxxer:HikariCP + + org.apache.flink:flink-shaded-guava + + + + + org.apache.kafka:* + + kafka/kafka-version.properties + LICENSE + + NOTICE + common/** + + + + + + org.apache.kafka + + com.ververica.cdc.connectors.shaded.org.apache.kafka + + + + org.antlr + + com.ververica.cdc.connectors.shaded.org.antlr + + + + com.fasterxml + + com.ververica.cdc.connectors.shaded.com.fasterxml + + + + com.google + + com.ververica.cdc.connectors.shaded.com.google + + + + com.esri.geometry + com.ververica.cdc.connectors.shaded.com.esri.geometry + + + com.zaxxer + + com.ververica.cdc.connectors.shaded.com.zaxxer + + + + + + + + + + \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java new file mode 100644 index 000000000..ad8332220 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -0,0 +1,356 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.factory; + +import org.apache.flink.table.api.ValidationException; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.configuration.ConfigOption; +import com.ververica.cdc.common.configuration.Configuration; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.factories.DataSourceFactory; +import com.ververica.cdc.common.factories.Factory; +import com.ververica.cdc.common.schema.Selectors; +import com.ververica.cdc.common.source.DataSource; +import com.ververica.cdc.connectors.mysql.source.MySqlDataSource; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; +import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.connectors.mysql.utils.MySqlSchemaUtils; +import com.ververica.cdc.connectors.mysql.utils.OptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECTION_POOL_SIZE; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_MAX_RETRIES; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; +import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; +import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; +import static com.ververica.cdc.debezium.utils.JdbcUrlUtils.getJdbcProperties; +import static org.apache.flink.util.Preconditions.checkState; + +/** A {@link Factory} to create {@link MySqlDataSource}. */ +@Internal +public class MySqlDataSourceFactory implements DataSourceFactory { + + private static final Logger LOG = LoggerFactory.getLogger(MySqlDataSourceFactory.class); + + public static final String IDENTIFIER = "mysql"; + + @Override + public DataSource createDataSource(Context context) { + final Configuration config = context.getConfiguration(); + String hostname = config.get(HOSTNAME); + int port = config.get(PORT); + + String username = config.get(USERNAME); + String password = config.get(PASSWORD); + String tables = config.get(TABLES); + + String serverId = validateAndGetServerId(config); + ZoneId serverTimeZone = getServerTimeZone(config); + StartupOptions startupOptions = getStartupOptions(config); + + boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED); + + int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); + int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); + int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE); + + double distributionFactorUpper = + config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + double distributionFactorLower = + config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + + boolean closeIdleReaders = + config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + + Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); + Duration connectTimeout = config.get(CONNECT_TIMEOUT); + int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); + int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); + + validateIntegerOption( + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); + validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); + validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1); + validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1); + validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0); + validateDistributionFactorUpper(distributionFactorUpper); + validateDistributionFactorLower(distributionFactorLower); + + Map configMap = config.toMap(); + OptionUtils.printOptions(IDENTIFIER, config.toMap()); + + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(hostname) + .port(port) + .username(username) + .password(password) + .databaseList(".*") + .tableList(".*") + .startupOptions(startupOptions) + .serverId(serverId) + .serverTimeZone(serverTimeZone.getId()) + .fetchSize(fetchSize) + .splitSize(splitSize) + .splitMetaGroupSize(splitMetaGroupSize) + .distributionFactorLower(distributionFactorLower) + .distributionFactorUpper(distributionFactorUpper) + .heartbeatInterval(heartbeatInterval) + .connectTimeout(connectTimeout) + .connectMaxRetries(connectMaxRetries) + .connectionPoolSize(connectionPoolSize) + .closeIdleReaders(closeIdleReaders) + .includeSchemaChanges(includeSchemaChanges) + .debeziumProperties(getDebeziumProperties(configMap)) + .jdbcProperties(getJdbcProperties(configMap)); + + Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); + configFactory.tableList(getTableList(configFactory.createConfig(0), selectors)); + + return new MySqlDataSource(configFactory); + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(HOSTNAME); + options.add(USERNAME); + options.add(PASSWORD); + options.add(TABLES); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(PORT); + options.add(SERVER_TIME_ZONE); + options.add(SERVER_ID); + options.add(SCAN_STARTUP_MODE); + options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE); + options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS); + options.add(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET); + options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS); + options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS); + options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); + options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); + options.add(CHUNK_META_GROUP_SIZE); + options.add(SCAN_SNAPSHOT_FETCH_SIZE); + options.add(CONNECT_TIMEOUT); + options.add(CONNECTION_POOL_SIZE); + options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + options.add(CONNECT_MAX_RETRIES); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + options.add(HEARTBEAT_INTERVAL); + options.add(SCHEMA_CHANGE_ENABLED); + return options; + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; + private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; + private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset"; + private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp"; + + private static String[] getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) { + return MySqlSchemaUtils.listTables(sourceConfig, null).stream() + .filter(selectors::isMatch) + .map(TableId::toString) + .toArray(String[]::new); + } + + private static StartupOptions getStartupOptions(Configuration config) { + String modeString = config.get(SCAN_STARTUP_MODE); + + switch (modeString.toLowerCase()) { + case SCAN_STARTUP_MODE_VALUE_INITIAL: + return StartupOptions.initial(); + + case SCAN_STARTUP_MODE_VALUE_LATEST: + return StartupOptions.latest(); + + case SCAN_STARTUP_MODE_VALUE_EARLIEST: + return StartupOptions.earliest(); + + case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET: + validateSpecificOffset(config); + return getSpecificOffset(config); + + case SCAN_STARTUP_MODE_VALUE_TIMESTAMP: + return StartupOptions.timestamp( + config.get(SCAN_STARTUP_TIMESTAMP_MILLIS)); + + default: + throw new ValidationException( + String.format( + "Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s], but was: %s", + SCAN_STARTUP_MODE.key(), + SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_LATEST, + SCAN_STARTUP_MODE_VALUE_EARLIEST, + SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET, + SCAN_STARTUP_MODE_VALUE_TIMESTAMP, + modeString)); + } + } + + private static void validateSpecificOffset(Configuration config) { + Optional gtidSet = + config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET); + Optional binlogFilename = + config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE); + Optional binlogPosition = + config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS); + if (!gtidSet.isPresent() && !(binlogFilename.isPresent() && binlogPosition.isPresent())) { + throw new ValidationException( + String.format( + "Unable to find a valid binlog offset. Either %s, or %s and %s are required.", + SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key(), + SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key(), + SCAN_STARTUP_SPECIFIC_OFFSET_POS.key())); + } + } + + private static StartupOptions getSpecificOffset(Configuration config) { + BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder(); + + // GTID set + config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET) + .ifPresent(offsetBuilder::setGtidSet); + + // Binlog file + pos + Optional binlogFilename = + config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE); + Optional binlogPosition = + config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS); + if (binlogFilename.isPresent() && binlogPosition.isPresent()) { + offsetBuilder.setBinlogFilePosition(binlogFilename.get(), binlogPosition.get()); + } else { + offsetBuilder.setBinlogFilePosition("", 0); + } + + config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS) + .ifPresent(offsetBuilder::setSkipEvents); + config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS) + .ifPresent(offsetBuilder::setSkipRows); + return StartupOptions.specificOffset(offsetBuilder.build()); + } + + private String validateAndGetServerId(Configuration configuration) { + final String serverIdValue = configuration.get(SERVER_ID); + if (serverIdValue != null) { + // validation + try { + ServerIdRange.from(serverIdValue); + } catch (Exception e) { + throw new ValidationException( + String.format( + "The value of option 'server-id' is invalid: '%s'", serverIdValue), + e); + } + } + return serverIdValue; + } + + /** Checks the value of given integer option is valid. */ + private void validateIntegerOption( + ConfigOption option, int optionValue, int exclusiveMin) { + checkState( + optionValue > exclusiveMin, + String.format( + "The value of option '%s' must larger than %d, but is %d", + option.key(), exclusiveMin, optionValue)); + } + + /** Checks the value of given evenly distribution factor upper bound is valid. */ + private void validateDistributionFactorUpper(double distributionFactorUpper) { + checkState( + doubleCompare(distributionFactorUpper, 1.0d) >= 0, + String.format( + "The value of option '%s' must larger than or equals %s, but is %s", + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), + 1.0d, + distributionFactorUpper)); + } + + /** Checks the value of given evenly distribution factor lower bound is valid. */ + private void validateDistributionFactorLower(double distributionFactorLower) { + checkState( + doubleCompare(distributionFactorLower, 0.0d) >= 0 + && doubleCompare(distributionFactorLower, 1.0d) <= 0, + String.format( + "The value of option '%s' must between %s and %s inclusively, but is %s", + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), + 0.0d, + 1.0d, + distributionFactorLower)); + } + + /** Replaces the default timezone placeholder with session timezone, if applicable. */ + private static ZoneId getServerTimeZone(Configuration config) { + final String serverTimeZone = config.get(SERVER_TIME_ZONE); + if (serverTimeZone != null) { + return ZoneId.of(serverTimeZone); + } else { + LOG.warn( + "{} is not set, which might cause data inconsistencies for time-related fields.", + SERVER_TIME_ZONE.key()); + return ZoneId.systemDefault(); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java new file mode 100644 index 000000000..ae333d40e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.source.DataSource; +import com.ververica.cdc.common.source.EventSourceProvider; +import com.ververica.cdc.common.source.FlinkSourceProvider; +import com.ververica.cdc.common.source.MetadataAccessor; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import com.ververica.cdc.debezium.table.DebeziumChangelogMode; + +import java.time.ZoneId; + +/** A {@link DataSource} for mysql cdc connector. */ +@Internal +public class MySqlDataSource implements DataSource { + + private final MySqlSourceConfigFactory configFactory; + private final MySqlSourceConfig sourceConfig; + + public MySqlDataSource(MySqlSourceConfigFactory configFactory) { + this.configFactory = configFactory; + this.sourceConfig = configFactory.createConfig(0); + } + + @Override + public EventSourceProvider getEventSourceProvider() { + MySqlEventDeserializer deserializer = + new MySqlEventDeserializer( + DebeziumChangelogMode.ALL, + ZoneId.of(sourceConfig.getServerTimeZone()), + sourceConfig.isIncludeSchemaChanges()); + return FlinkSourceProvider.of(new MySqlSource<>(configFactory, deserializer)); + } + + @Override + public MetadataAccessor getMetadataAccessor() { + return new MySqlMetadataAccessor(sourceConfig); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java new file mode 100644 index 000000000..72cbdf6fd --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -0,0 +1,232 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source; + +import com.ververica.cdc.common.annotation.Experimental; +import com.ververica.cdc.common.annotation.PublicEvolving; +import com.ververica.cdc.common.configuration.ConfigOption; +import com.ververica.cdc.common.configuration.ConfigOptions; + +import java.time.Duration; + +/** Configurations for {@link MySqlDataSource}. */ +@PublicEvolving +public class MySqlDataSourceOptions { + + public static final ConfigOption HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the MySQL database server."); + + public static final ConfigOption PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(3306) + .withDescription("Integer port number of the MySQL database server."); + + public static final ConfigOption USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the MySQL database to use when connecting to the MySQL database server."); + + public static final ConfigOption PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the MySQL database server."); + public static final ConfigOption TABLES = + ConfigOptions.key("tables") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the MySQL tables to monitor. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*"); + + public static final ConfigOption SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .noDefaultValue() + .withDescription( + "The session time zone in database server. If not set, then " + + "ZoneId.systemDefault() is used to determine the server time zone."); + + public static final ConfigOption SERVER_ID = + ConfigOptions.key("server-id") + .stringType() + .noDefaultValue() + .withDescription( + "A numeric ID or a numeric ID range of this database client, " + + "The numeric ID syntax is like '5400', the numeric ID range syntax " + + "is like '5400-5408', The numeric ID range syntax is recommended when " + + "'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all " + + "currently-running database processes in the MySQL cluster. This connector" + + " joins the MySQL cluster as another server (with this unique ID) " + + "so it can read the binlog. By default, a random number is generated between" + + " 5400 and 6400, though we recommend setting an explicit value."); + + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE = + ConfigOptions.key("scan.incremental.snapshot.chunk.size") + .intType() + .defaultValue(8096) + .withDescription( + "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + + public static final ConfigOption SCAN_SNAPSHOT_FETCH_SIZE = + ConfigOptions.key("scan.snapshot.fetch.size") + .intType() + .defaultValue(1024) + .withDescription( + "The maximum fetch size for per poll when read table snapshot."); + + public static final ConfigOption CONNECT_TIMEOUT = + ConfigOptions.key("connect.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out."); + + public static final ConfigOption CONNECTION_POOL_SIZE = + ConfigOptions.key("connection.pool.size") + .intType() + .defaultValue(20) + .withDescription("The connection pool size."); + + public static final ConfigOption CONNECT_MAX_RETRIES = + ConfigOptions.key("connect.max-retries") + .intType() + .defaultValue(3) + .withDescription( + "The max retry times that the connector should retry to build MySQL database server connection."); + + public static final ConfigOption SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for MySQL CDC consumer, valid enumerations are " + + "\"initial\", \"earliest-offset\", \"latest-offset\", \"timestamp\"\n" + + "or \"specific-offset\""); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_FILE = + ConfigOptions.key("scan.startup.specific-offset.file") + .stringType() + .noDefaultValue() + .withDescription( + "Optional binlog file name used in case of \"specific-offset\" startup mode"); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_POS = + ConfigOptions.key("scan.startup.specific-offset.pos") + .longType() + .noDefaultValue() + .withDescription( + "Optional binlog file position used in case of \"specific-offset\" startup mode"); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET = + ConfigOptions.key("scan.startup.specific-offset.gtid-set") + .stringType() + .noDefaultValue() + .withDescription( + "Optional GTID set used in case of \"specific-offset\" startup mode"); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS = + ConfigOptions.key("scan.startup.specific-offset.skip-events") + .longType() + .noDefaultValue() + .withDescription( + "Optional number of events to skip after the specific starting offset"); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS = + ConfigOptions.key("scan.startup.specific-offset.skip-rows") + .longType() + .noDefaultValue() + .withDescription("Optional number of rows to skip after the specific offset"); + + public static final ConfigOption SCAN_STARTUP_TIMESTAMP_MILLIS = + ConfigOptions.key("scan.startup.timestamp-millis") + .longType() + .noDefaultValue() + .withDescription( + "Optional timestamp used in case of \"timestamp\" startup mode"); + + public static final ConfigOption HEARTBEAT_INTERVAL = + ConfigOptions.key("heartbeat.interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "Optional interval of sending heartbeat event for tracing the latest available binlog offsets"); + + // ---------------------------------------------------------------------------- + // experimental options, won't add them to documentation + // ---------------------------------------------------------------------------- + @Experimental + public static final ConfigOption CHUNK_META_GROUP_SIZE = + ConfigOptions.key("chunk-meta.group.size") + .intType() + .defaultValue(1000) + .withDescription( + "The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups."); + + @Experimental + public static final ConfigOption CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound") + .doubleType() + .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") + .withDescription( + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query MySQL for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound") + .doubleType() + .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") + .withDescription( + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query MySQL for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = + ConfigOptions.key("scan.incremental.close-idle-reader.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to close idle readers at the end of the snapshot phase. This feature depends on " + + "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be " + + "greater than or equal to 1.14 when enabling this feature."); + + @Experimental + public static final ConfigOption SCHEMA_CHANGE_ENABLED = + ConfigOptions.key("schema-change.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Whether , by default is false."); +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java new file mode 100644 index 000000000..7ef4e52b3 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -0,0 +1,92 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.event.SchemaChangeEvent; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.debezium.event.DebeziumEventDeserializationSchema; +import com.ververica.cdc.debezium.table.DebeziumChangelogMode; +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import java.time.ZoneId; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Event deserializer for {@link MySqlDataSource}. */ +@Internal +public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { + + private static final long serialVersionUID = 1L; + + public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = + "io.debezium.connector.mysql.SchemaChangeKey"; + + private final boolean includeSchemaChanges; + + public MySqlEventDeserializer( + DebeziumChangelogMode changelogMode, + ZoneId serverTimeZone, + boolean includeSchemaChanges) { + super(changelogMode, serverTimeZone); + this.includeSchemaChanges = includeSchemaChanges; + } + + @Override + protected List deserializeSchemaChangeRecord(SourceRecord record) { + if (includeSchemaChanges) { + // TODO: support schema change event + return Collections.emptyList(); + } + return Collections.emptyList(); + } + + @Override + protected boolean isDataChangeRecord(SourceRecord record) { + Schema valueSchema = record.valueSchema(); + Struct value = (Struct) record.value(); + return value != null + && valueSchema != null + && valueSchema.field(Envelope.FieldName.OPERATION) != null + && value.getString(Envelope.FieldName.OPERATION) != null; + } + + @Override + protected boolean isSchemaChangeRecord(SourceRecord record) { + Schema keySchema = record.keySchema(); + return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); + } + + @Override + protected TableId getTableId(SourceRecord record) { + if (isDataChangeRecord(record)) { + String[] parts = record.topic().split("\\."); + return TableId.tableId(parts[1], parts[2]); + } + // TODO: get table id from schema change record + return null; + } + + @Override + protected Map getMetadata(SourceRecord record) { + return Collections.emptyMap(); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlMetadataAccessor.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlMetadataAccessor.java new file mode 100644 index 000000000..0ed37cbb5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlMetadataAccessor.java @@ -0,0 +1,86 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.schema.Schema; +import com.ververica.cdc.common.source.MetadataAccessor; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.ververica.cdc.connectors.mysql.utils.MySqlSchemaUtils; +import io.debezium.connector.mysql.MySqlPartition; + +import javax.annotation.Nullable; + +import java.util.List; + +/** {@link MetadataAccessor} for {@link MySqlDataSource}. */ +@Internal +public class MySqlMetadataAccessor implements MetadataAccessor { + + private final MySqlSourceConfig sourceConfig; + + private final MySqlPartition partition; + + public MySqlMetadataAccessor(MySqlSourceConfig sourceConfig) { + this.sourceConfig = sourceConfig; + this.partition = + new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName()); + } + + /** + * Always throw {@link UnsupportedOperationException} because MySQL does not support namespace. + */ + @Override + public List listNamespaces() { + throw new UnsupportedOperationException("List namespace is not supported by MySQL."); + } + + /** + * List all database from MySQL. + * + * @param namespace This parameter is ignored because MySQL does not support namespace. + * @return The list of database + */ + @Override + public List listSchemas(@Nullable String namespace) { + return MySqlSchemaUtils.listDatabases(sourceConfig); + } + + /** + * List tables from MySQL. + * + * @param namespace This parameter is ignored because MySQL does not support namespace. + * @param dbName The database to list tables from. If null, list tables from all databases. + * @return The list of {@link TableId}s. + */ + @Override + public List listTables(@Nullable String namespace, @Nullable String dbName) { + return MySqlSchemaUtils.listTables(sourceConfig, dbName); + } + + /** + * Get the {@link Schema} of the given table. + * + * @param tableId The {@link TableId} of the given table. + * @return The {@link Schema} of the table. + */ + @Override + public Schema getTableSchema(TableId tableId) { + return MySqlSchemaUtils.getTableSchema(sourceConfig, partition, tableId); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlSchemaUtils.java new file mode 100644 index 000000000..8eacd95fe --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -0,0 +1,158 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.utils; + +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.schema.Column; +import com.ververica.cdc.common.schema.Schema; +import com.ververica.cdc.connectors.mysql.schema.MySqlSchema; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.connector.mysql.MySqlPartition; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Table; +import io.debezium.relational.history.TableChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; +import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote; + +/** Utilities for converting from debezium {@link Table} types to {@link Schema}. */ +public class MySqlSchemaUtils { + + private static final Logger LOG = LoggerFactory.getLogger(MySqlSchemaUtils.class); + + public static List listDatabases(MySqlSourceConfig sourceConfig) { + try (JdbcConnection jdbc = createMySqlConnection(sourceConfig)) { + return listDatabases(jdbc); + } catch (SQLException e) { + throw new RuntimeException("Error to list databases: " + e.getMessage(), e); + } + } + + public static List listTables( + MySqlSourceConfig sourceConfig, @Nullable String dbName) { + try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) { + List databases = + dbName != null ? Collections.singletonList(dbName) : listDatabases(jdbc); + + List tableIds = new ArrayList<>(); + for (String database : databases) { + tableIds.addAll(listTables(jdbc, database)); + } + return tableIds; + } catch (SQLException e) { + throw new RuntimeException("Error to list databases: " + e.getMessage(), e); + } + } + + public static Schema getTableSchema( + MySqlSourceConfig sourceConfig, MySqlPartition partition, TableId tableId) { + try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) { + return getTableSchema(partition, tableId, sourceConfig, jdbc); + } catch (SQLException e) { + throw new RuntimeException("Error to get table schema: " + e.getMessage(), e); + } + } + + public static List listDatabases(JdbcConnection jdbc) throws SQLException { + // ------------------- + // READ DATABASE NAMES + // ------------------- + // Get the list of databases ... + LOG.info("Read list of available databases"); + final List databaseNames = new ArrayList<>(); + jdbc.query( + "SHOW DATABASES WHERE `database` NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys')", + rs -> { + while (rs.next()) { + databaseNames.add(rs.getString(1)); + } + }); + LOG.info("\t list of available databases are: {}", databaseNames); + return databaseNames; + } + + public static List listTables(JdbcConnection jdbc, String dbName) throws SQLException { + // ---------------- + // READ TABLE NAMES + // ---------------- + // Get the list of table IDs for each database. We can't use a prepared statement with + // MySQL, so we have to build the SQL statement each time. Although in other cases this + // might lead to SQL injection, in our case we are reading the database names from the + // database and not taking them from the user ... + LOG.info("Read list of available tables in {}", dbName); + final List tableIds = new ArrayList<>(); + jdbc.query( + "SHOW FULL TABLES IN " + quote(dbName) + " where Table_Type = 'BASE TABLE'", + rs -> { + while (rs.next()) { + tableIds.add(TableId.tableId(dbName, rs.getString(1))); + } + }); + LOG.info("\t list of available tables are: {}", tableIds); + return tableIds; + } + + public static Schema getTableSchema( + MySqlPartition partition, + TableId tableId, + MySqlSourceConfig sourceConfig, + MySqlConnection jdbc) { + // fetch table schemas + try (MySqlSchema mySqlSchema = + new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) { + TableChanges.TableChange tableSchema = + mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId)); + return toSchema(tableSchema.getTable()); + } + } + + public static Schema toSchema(Table table) { + List columns = + table.columns().stream() + .map(MySqlSchemaUtils::toColumn) + .collect(Collectors.toList()); + + return Schema.newBuilder() + .setColumns(columns) + .primaryKey(table.primaryKeyColumnNames()) + .comment(table.comment()) + .build(); + } + + public static Column toColumn(io.debezium.relational.Column column) { + return Column.physicalColumn( + column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment()); + } + + public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { + return new io.debezium.relational.TableId( + tableId.getSchemaName(), null, tableId.getTableName()); + } + + private MySqlSchemaUtils() {} +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlTypeUtils.java new file mode 100644 index 000000000..a0d5d0105 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -0,0 +1,186 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.utils; + +import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DataTypes; +import io.debezium.relational.Column; + +/** Utilities for converting from MySQL types to {@link DataType}s. */ +public class MySqlTypeUtils { + + // ------ MySQL Type ------ + // https://dev.mysql.com/doc/refman/8.0/en/data-types.html + private static final String BIT = "BIT"; + private static final String TINYINT = "TINYINT"; + private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED"; + private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL"; + private static final String SMALLINT = "SMALLINT"; + private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; + private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT UNSIGNED ZEROFILL"; + private static final String MEDIUMINT = "MEDIUMINT"; + private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; + private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT UNSIGNED ZEROFILL"; + private static final String INT = "INT"; + private static final String INT_UNSIGNED = "INT UNSIGNED"; + private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL"; + private static final String BIGINT = "BIGINT"; + private static final String SERIAL = "SERIAL"; + private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED"; + private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED ZEROFILL"; + private static final String REAL = "REAL"; + private static final String REAL_UNSIGNED = "REAL UNSIGNED"; + private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED ZEROFILL"; + private static final String FLOAT = "FLOAT"; + private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED"; + private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED ZEROFILL"; + private static final String DOUBLE = "DOUBLE"; + private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; + private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED ZEROFILL"; + private static final String DOUBLE_PRECISION = "DOUBLE PRECISION"; + private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION UNSIGNED"; + private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL = + "DOUBLE PRECISION UNSIGNED ZEROFILL"; + private static final String NUMERIC = "NUMERIC"; + private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED"; + private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED ZEROFILL"; + private static final String FIXED = "FIXED"; + private static final String FIXED_UNSIGNED = "FIXED UNSIGNED"; + private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED ZEROFILL"; + private static final String DECIMAL = "DECIMAL"; + private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; + private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED ZEROFILL"; + private static final String CHAR = "CHAR"; + private static final String VARCHAR = "VARCHAR"; + private static final String TINYTEXT = "TINYTEXT"; + private static final String MEDIUMTEXT = "MEDIUMTEXT"; + private static final String TEXT = "TEXT"; + private static final String LONGTEXT = "LONGTEXT"; + private static final String DATE = "DATE"; + private static final String TIME = "TIME"; + private static final String DATETIME = "DATETIME"; + private static final String TIMESTAMP = "TIMESTAMP"; + private static final String YEAR = "YEAR"; + private static final String BINARY = "BINARY"; + private static final String VARBINARY = "VARBINARY"; + private static final String TINYBLOB = "TINYBLOB"; + private static final String MEDIUMBLOB = "MEDIUMBLOB"; + private static final String BLOB = "BLOB"; + private static final String LONGBLOB = "LONGBLOB"; + private static final String JSON = "JSON"; + private static final String SET = "SET"; + private static final String ENUM = "ENUM"; + private static final String GEOMETRY = "GEOMETRY"; + private static final String UNKNOWN = "UNKNOWN"; + + /** Returns a corresponding Flink data type from a debezium {@link Column}. */ + public static DataType fromDbzColumn(Column column) { + DataType dataType = convertFromColumn(column); + if (column.isOptional()) { + return dataType; + } else { + return dataType.notNull(); + } + } + + /** + * Returns a corresponding Flink data type from a debezium {@link Column} with nullable always + * be true. + */ + private static DataType convertFromColumn(Column column) { + String typeName = column.typeName(); + switch (typeName) { + case TINYINT: + // MySQL haven't boolean type, it uses tinyint(1) to represents boolean type + // user should not use tinyint(1) to store number although jdbc url parameter + // tinyInt1isBit=false can help change the return value, it's not a general way + // btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default + return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT(); + case TINYINT_UNSIGNED: + case TINYINT_UNSIGNED_ZEROFILL: + case SMALLINT: + return DataTypes.SMALLINT(); + case SMALLINT_UNSIGNED: + case SMALLINT_UNSIGNED_ZEROFILL: + case INT: + case MEDIUMINT: + return DataTypes.INT(); + case INT_UNSIGNED: + case INT_UNSIGNED_ZEROFILL: + case MEDIUMINT_UNSIGNED: + case MEDIUMINT_UNSIGNED_ZEROFILL: + case BIGINT: + return DataTypes.BIGINT(); + case BIGINT_UNSIGNED: + case BIGINT_UNSIGNED_ZEROFILL: + case SERIAL: + return DataTypes.DECIMAL(20, 0); + case FLOAT: + case FLOAT_UNSIGNED: + case FLOAT_UNSIGNED_ZEROFILL: + return DataTypes.FLOAT(); + case REAL: + case REAL_UNSIGNED: + case REAL_UNSIGNED_ZEROFILL: + case DOUBLE: + case DOUBLE_UNSIGNED: + case DOUBLE_UNSIGNED_ZEROFILL: + case DOUBLE_PRECISION: + case DOUBLE_PRECISION_UNSIGNED: + case DOUBLE_PRECISION_UNSIGNED_ZEROFILL: + return DataTypes.DOUBLE(); + case NUMERIC: + case NUMERIC_UNSIGNED: + case NUMERIC_UNSIGNED_ZEROFILL: + case FIXED: + case FIXED_UNSIGNED: + case FIXED_UNSIGNED_ZEROFILL: + case DECIMAL: + case DECIMAL_UNSIGNED: + case DECIMAL_UNSIGNED_ZEROFILL: + return column.length() <= 38 + ? DataTypes.DECIMAL(column.length(), column.scale().orElse(0)) + : DataTypes.STRING(); + case TIME: + return column.length() >= 0 ? DataTypes.TIME(column.length()) : DataTypes.TIME(); + case DATE: + return DataTypes.DATE(); + case DATETIME: + case TIMESTAMP: + return column.length() >= 0 + ? DataTypes.TIMESTAMP_LTZ(column.length()) + : DataTypes.TIMESTAMP_LTZ(); + case CHAR: + return DataTypes.CHAR(column.length()); + case VARCHAR: + return DataTypes.VARCHAR(column.length()); + case TEXT: + return DataTypes.STRING(); + case BINARY: + return DataTypes.BINARY(column.length()); + case VARBINARY: + return DataTypes.VARBINARY(column.length()); + case BLOB: + return DataTypes.BYTES(); + default: + throw new UnsupportedOperationException( + String.format("Don't support MySQL type '%s' yet.", typeName)); + } + } + + private MySqlTypeUtils() {} +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory new file mode 100644 index 000000000..3f53f1a6b --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory @@ -0,0 +1,15 @@ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.ververica.cdc.connectors.mysql.factory.MySqlDataSourceFactory diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlEventSourceITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlEventSourceITCase.java new file mode 100644 index 000000000..b41de8011 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlEventSourceITCase.java @@ -0,0 +1,238 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CloseableIterator; + +import com.ververica.cdc.common.data.binary.BinaryRecordData; +import com.ververica.cdc.common.data.binary.BinaryStringData; +import com.ververica.cdc.common.event.DataChangeEvent; +import com.ververica.cdc.common.event.Event; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.types.DataTypes; +import com.ververica.cdc.common.types.RowType; +import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; +import com.ververica.cdc.debezium.table.DebeziumChangelogMode; +import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import io.debezium.jdbc.JdbcConnection; +import org.junit.After; +import org.junit.Test; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link MySqlSource} with {@link MySqlEventDeserializer}. */ +public class MySqlEventSourceITCase extends MySqlSourceTestBase { + + private final BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + RowType.of( + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING())); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + + @After + public void clear() { + customerDatabase.dropDatabase(); + } + + @Test + public void testProduceDataChangeEvents() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.noRestart()); + + customerDatabase.createAndInitialize(); + + MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}); + MySqlSource eventSource = getMySqlSource(sourceConfig); + + DataStreamSource source = + env.fromSource(eventSource, WatermarkStrategy.noWatermarks(), "Event-Source"); + + CloseableIterator iterator = source.collectAsync(); + env.executeAsync(); + + TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), "customers"); + + List snapshotRecords = + customerRecordData( + Arrays.asList( + new Object[] {101, "user_1", "Shanghai", "123567891234"}, + new Object[] {102, "user_2", "Shanghai", "123567891234"}, + new Object[] {103, "user_3", "Shanghai", "123567891234"}, + new Object[] {109, "user_4", "Shanghai", "123567891234"}, + new Object[] {110, "user_5", "Shanghai", "123567891234"}, + new Object[] {111, "user_6", "Shanghai", "123567891234"}, + new Object[] {118, "user_7", "Shanghai", "123567891234"}, + new Object[] {121, "user_8", "Shanghai", "123567891234"}, + new Object[] {123, "user_9", "Shanghai", "123567891234"}, + new Object[] {1009, "user_10", "Shanghai", "123567891234"}, + new Object[] {1010, "user_11", "Shanghai", "123567891234"}, + new Object[] {1011, "user_12", "Shanghai", "123567891234"}, + new Object[] {1012, "user_13", "Shanghai", "123567891234"}, + new Object[] {1013, "user_14", "Shanghai", "123567891234"}, + new Object[] {1014, "user_15", "Shanghai", "123567891234"}, + new Object[] {1015, "user_16", "Shanghai", "123567891234"}, + new Object[] {1016, "user_17", "Shanghai", "123567891234"}, + new Object[] {1017, "user_18", "Shanghai", "123567891234"}, + new Object[] {1018, "user_19", "Shanghai", "123567891234"}, + new Object[] {1019, "user_20", "Shanghai", "123567891234"}, + new Object[] {2000, "user_21", "Shanghai", "123567891234"})); + + List expectedSnapshotResults = + snapshotRecords.stream() + .map( + record -> + DataChangeEvent.insertEvent( + tableId, record, Collections.emptyMap())) + .collect(Collectors.toList()); + + List snapshotResults = fetchResults(iterator, expectedSnapshotResults.size()); + assertThat(snapshotResults).containsExactlyInAnyOrderElementsOf(expectedSnapshotResults); + + try (JdbcConnection connection = DebeziumUtils.createMySqlConnection(sourceConfig)) { + connection.setAutoCommit(false); + connection.execute( + "INSERT INTO " + + tableId + + " VALUES(2001, 'user_22','Shanghai','123567891234')," + + " (2002, 'user_23','Shanghai','123567891234')," + + "(2003, 'user_24','Shanghai','123567891234')"); + connection.execute("DELETE FROM " + tableId + " where id = 101"); + connection.execute("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010"); + connection.commit(); + } + + List expectedStreamRecords = new ArrayList<>(); + + List insertedRecords = + customerRecordData( + Arrays.asList( + new Object[] {2001, "user_22", "Shanghai", "123567891234"}, + new Object[] {2002, "user_23", "Shanghai", "123567891234"}, + new Object[] {2003, "user_24", "Shanghai", "123567891234"})); + expectedStreamRecords.addAll( + insertedRecords.stream() + .map( + record -> + DataChangeEvent.insertEvent( + tableId, record, Collections.emptyMap())) + .collect(Collectors.toList())); + + BinaryRecordData deletedRecord = + customerRecordData(new Object[] {101, "user_1", "Shanghai", "123567891234"}); + expectedStreamRecords.add( + DataChangeEvent.deleteEvent(tableId, deletedRecord, Collections.emptyMap())); + + BinaryRecordData updateBeforeRecord = + customerRecordData(new Object[] {1010, "user_11", "Shanghai", "123567891234"}); + BinaryRecordData updateAfterRecord = + customerRecordData(new Object[] {1010, "user_11", "Hangzhou", "123567891234"}); + expectedStreamRecords.add( + DataChangeEvent.updateEvent( + tableId, updateBeforeRecord, updateAfterRecord, Collections.emptyMap())); + + List streamResults = fetchResults(iterator, expectedStreamRecords.size()); + assertThat(streamResults).isEqualTo(expectedStreamRecords); + } + + private MySqlSource getMySqlSource(MySqlSourceConfig sourceConfig) { + MySqlEventDeserializer deserializer = + new MySqlEventDeserializer( + DebeziumChangelogMode.ALL, + ZoneId.of(sourceConfig.getServerTimeZone()), + sourceConfig.isIncludeSchemaChanges()); + + return MySqlSource.builder() + .hostname(sourceConfig.getHostname()) + .port(sourceConfig.getPort()) + .databaseList(sourceConfig.getDatabaseList().toArray(new String[0])) + .tableList(sourceConfig.getTableList().toArray(new String[0])) + .username(sourceConfig.getUsername()) + .password(sourceConfig.getPassword()) + .deserializer(deserializer) + .serverTimeZone(sourceConfig.getServerTimeZone()) + .debeziumProperties(sourceConfig.getDbzProperties()) + .build(); + } + + private MySqlSourceConfig getConfig(String[] captureTables) { + String[] captureTableIds = + Arrays.stream(captureTables) + .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) + .toArray(String[]::new); + + return new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.latest()) + .databaseList(customerDatabase.getDatabaseName()) + .tableList(captureTableIds) + .includeSchemaChanges(false) + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .splitSize(10) + .fetchSize(2) + .username(customerDatabase.getUsername()) + .password(customerDatabase.getPassword()) + .serverTimeZone(ZoneId.of("UTC").toString()) + .createConfig(0); + } + + private List customerRecordData(List records) { + return records.stream().map(this::customerRecordData).collect(Collectors.toList()); + } + + private BinaryRecordData customerRecordData(Object[] record) { + return generator.generate( + new Object[] { + record[0], + BinaryStringData.fromString((String) record[1]), + BinaryStringData.fromString((String) record[2]), + BinaryStringData.fromString((String) record[3]) + }); + } + + private static List fetchResults(Iterator iter, int size) { + List result = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Event event = iter.next(); + result.add(event); + size--; + } + return result; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql new file mode 100644 index 000000000..b80846345 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql @@ -0,0 +1,326 @@ +-- Copyright 2023 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: customer +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate our users using a single insert with many rows +CREATE TABLE customers ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); + +-- Create a table will not be read +CREATE TABLE prefix_customers ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO prefix_customers +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"); + +-- table has same name prefix with 'customers.*' +CREATE TABLE customers_1 ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers_1 +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); + +-- create table whose split key is evenly distributed +CREATE TABLE customers_even_dist ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL , + address VARCHAR(1024), + phone_number VARCHAR(512) +); +INSERT INTO customers_even_dist +VALUES (101,'user_1','Shanghai','123567891234'), + (102,'user_2','Shanghai','123567891234'), + (103,'user_3','Shanghai','123567891234'), + (104,'user_4','Shanghai','123567891234'), + (105,'user_5','Shanghai','123567891234'), + (106,'user_6','Shanghai','123567891234'), + (107,'user_7','Shanghai','123567891234'), + (108,'user_8','Shanghai','123567891234'), + (109,'user_9','Shanghai','123567891234'), + (110,'user_10','Shanghai','123567891234'); + +-- create table whose split key is evenly distributed and sparse +CREATE TABLE customers_sparse_dist ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL , + address VARCHAR(1024), + phone_number VARCHAR(512) +); +INSERT INTO customers_sparse_dist +VALUES (2,'user_1','Shanghai','123567891234'), + (4,'user_2','Shanghai','123567891234'), + (6,'user_3','Shanghai','123567891234'), + (8,'user_4','Shanghai','123567891234'), + (10,'user_5','Shanghai','123567891234'), + (16,'user_6','Shanghai','123567891234'), + (17,'user_7','Shanghai','123567891234'), + (18,'user_8','Shanghai','123567891234'), + (20,'user_9','Shanghai','123567891234'), + (22,'user_10','Shanghai','123567891234'); + +-- create table whose split key is evenly distributed and dense +CREATE TABLE customers_dense_dist ( + id1 INTEGER NOT NULL, + id2 VARCHAR(255) NOT NULL , + address VARCHAR(1024), + phone_number VARCHAR(512), + PRIMARY KEY(id1, id2) +); +INSERT INTO customers_dense_dist +VALUES (1,'user_1','Shanghai','123567891234'), + (1,'user_2','Shanghai','123567891234'), + (1,'user_3','Shanghai','123567891234'), + (1,'user_4','Shanghai','123567891234'), + (2,'user_5','Shanghai','123567891234'), + (2,'user_6','Shanghai','123567891234'), + (2,'user_7','Shanghai','123567891234'), + (3,'user_8','Shanghai','123567891234'), + (3,'user_9','Shanghai','123567891234'), + (3,'user_10','Shanghai','123567891234'); + +CREATE TABLE customers_no_pk ( + id INTEGER, + name VARCHAR(255) DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers_no_pk +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); + +-- table has combined primary key +CREATE TABLE customer_card ( + card_no BIGINT NOT NULL, + level VARCHAR(10) NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + note VARCHAR(1024), + PRIMARY KEY(card_no, level) +); + +insert into customer_card +VALUES (20001, 'LEVEL_4', 'user_1', 'user with level 4'), + (20002, 'LEVEL_4', 'user_2', 'user with level 4'), + (20003, 'LEVEL_4', 'user_3', 'user with level 4'), + (20004, 'LEVEL_4', 'user_4', 'user with level 4'), + (20004, 'LEVEL_1', 'user_4', 'user with level 4'), + (20004, 'LEVEL_2', 'user_4', 'user with level 4'), + (20004, 'LEVEL_3', 'user_4', 'user with level 4'), + (30006, 'LEVEL_3', 'user_5', 'user with level 3'), + (30007, 'LEVEL_3', 'user_6', 'user with level 3'), + (30008, 'LEVEL_3', 'user_7', 'user with level 3'), + (30009, 'LEVEL_3', 'user_8', 'user with level 3'), + (30009, 'LEVEL_2', 'user_8', 'user with level 3'), + (30009, 'LEVEL_1', 'user_8', 'user with level 3'), + (40001, 'LEVEL_2', 'user_9', 'user with level 2'), + (40002, 'LEVEL_2', 'user_10', 'user with level 2'), + (40003, 'LEVEL_2', 'user_11', 'user with level 2'), + (50001, 'LEVEL_1', 'user_12', 'user with level 1'), + (50002, 'LEVEL_1', 'user_13', 'user with level 1'), + (50003, 'LEVEL_1', 'user_14', 'user with level 1'); + +-- table has single line +CREATE TABLE customer_card_single_line ( + card_no BIGINT NOT NULL, + level VARCHAR(10) NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + note VARCHAR(1024), + PRIMARY KEY(card_no, level) +); + +insert into customer_card_single_line +VALUES (20001, 'LEVEL_1', 'user_1', 'user with level 1'); + + +-- table has combined primary key +CREATE TABLE shopping_cart ( + product_no INT NOT NULL, + product_kind VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + description VARCHAR(255) NOT NULL, + PRIMARY KEY(user_id, product_no, product_kind) +); + +insert into shopping_cart +VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'), + (101, 'KIND_002', 'user_1', 'my shopping cart'), + (102, 'KIND_007', 'user_1', 'my shopping cart'), + (102, 'KIND_008', 'user_1', 'my shopping cart'), + (501, 'KIND_100', 'user_2', 'my shopping list'), + (701, 'KIND_999', 'user_3', 'my shopping list'), + (801, 'KIND_010', 'user_4', 'my shopping list'), + (600, 'KIND_009', 'user_4', 'my shopping list'), + (401, 'KIND_002', 'user_5', 'leo list'), + (401, 'KIND_007', 'user_5', 'leo list'), + (404, 'KIND_008', 'user_5', 'leo list'), + (600, 'KIND_009', 'user_6', 'my shopping cart'); + +-- table has combined primary key and one of the primary key is evenly +CREATE TABLE evenly_shopping_cart ( + product_no INT NOT NULL, + product_kind VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + description VARCHAR(255) NOT NULL, + PRIMARY KEY(product_kind, product_no, user_id) +); + +insert into evenly_shopping_cart +VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'), + (102, 'KIND_002', 'user_1', 'my shopping cart'), + (103, 'KIND_007', 'user_1', 'my shopping cart'), + (104, 'KIND_008', 'user_1', 'my shopping cart'), + (105, 'KIND_100', 'user_2', 'my shopping list'), + (105, 'KIND_999', 'user_3', 'my shopping list'), + (107, 'KIND_010', 'user_4', 'my shopping list'), + (108, 'KIND_009', 'user_4', 'my shopping list'), + (109, 'KIND_002', 'user_5', 'leo list'), + (111, 'KIND_007', 'user_5', 'leo list'), + (111, 'KIND_008', 'user_5', 'leo list'), + (112, 'KIND_009', 'user_6', 'my shopping cart'); + +-- table has bigint unsigned auto increment primary key +CREATE TABLE shopping_cart_big ( + product_no BIGINT UNSIGNED AUTO_INCREMENT NOT NULL, + product_kind VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + description VARCHAR(255) NOT NULL, + PRIMARY KEY(product_no) +); + +insert into shopping_cart_big +VALUES (default, 'KIND_001', 'user_1', 'my shopping cart'), + (default, 'KIND_002', 'user_1', 'my shopping cart'), + (default, 'KIND_003', 'user_1', 'my shopping cart'); + +-- table has decimal primary key +CREATE TABLE shopping_cart_dec ( + product_no DECIMAL(10, 4) NOT NULL, + product_kind VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + description VARCHAR(255) DEFAULT 'flink', + PRIMARY KEY(product_no) +); + +insert into shopping_cart_dec +VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'), + (123457.456, 'KIND_002', 'user_2', 'my shopping cart'), + (123458.6789, 'KIND_003', 'user_3', 'my shopping cart'), + (123459.1234, 'KIND_004', 'user_4', null); + +-- create table whose primary key are produced by snowflake algorithm +CREATE TABLE address ( + id BIGINT UNSIGNED NOT NULL PRIMARY KEY, + country VARCHAR(255) NOT NULL, + city VARCHAR(255) NOT NULL, + detail_address VARCHAR(1024) +); + +INSERT INTO address +VALUES (416874195632735147, 'China', 'Beijing', 'West Town address 1'), + (416927583791428523, 'China', 'Beijing', 'West Town address 2'), + (417022095255614379, 'China', 'Beijing', 'West Town address 3'), + (417111867899200427, 'America', 'New York', 'East Town address 1'), + (417271541558096811, 'America', 'New York', 'East Town address 2'), + (417272886855938987, 'America', 'New York', 'East Town address 3'), + (417420106184475563, 'Germany', 'Berlin', 'West Town address 1'), + (418161258277847979, 'Germany', 'Berlin', 'West Town address 2'); + +CREATE TABLE default_value_test ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number INTEGER DEFAULT ' 123 ' +); +INSERT INTO default_value_test +VALUES (1,'user1','Shanghai',123567), + (2,'user2','Shanghai',123567); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-gtids/expire-seconds/my.cnf b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-gtids/expire-seconds/my.cnf new file mode 100644 index 000000000..5715c45e0 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-gtids/expire-seconds/my.cnf @@ -0,0 +1,64 @@ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems. +server-id = 223344 +log_bin = mysql-bin +binlog_format = row +# Make binlog_expire_logs_seconds = 1 and max_binlog_size = 4096 to test the exception +# message when the binlog expires in the server. +binlog_expire_logs_seconds = 1 +max_binlog_size = 4096 + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-gtids/my.cnf b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-gtids/my.cnf new file mode 100644 index 000000000..fd09af85d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/server-gtids/my.cnf @@ -0,0 +1,61 @@ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/setup.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/setup.sql new file mode 100644 index 000000000..8586a8489 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/docker/setup.sql @@ -0,0 +1,28 @@ +-- Copyright 2023 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- In production you would almost certainly limit the replication user must be on the follower (slave) machine, +-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'. +-- However, in this database we'll grant 2 users different privileges: +-- +-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing) +-- 2) 'mysqluser' - all privileges +-- +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%'; +CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw'; +GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%'; + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: emptydb +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE emptydb; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..a9d045e0e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/log4j2-test.properties @@ -0,0 +1,26 @@ +################################################################################ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/pipeline-test.yaml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/pipeline-test.yaml new file mode 100644 index 000000000..dc94f571d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/pipeline-test.yaml @@ -0,0 +1,29 @@ +################################################################################ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +source: + type: mysql + name: MySQL Source + hostname: localhost + port: 3306 + username: root + password: root + tables: test.\.* + +sink: + type: values + name: Values Sink + +pipeline: + pipeline.global.parallelism: 1 \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory index e63e17463..bc3d3170e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/resources/META-INF/services/com.ververica.cdc.common.factories.Factory @@ -1,9 +1,8 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml index c3ef405ad..b68c60976 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml @@ -27,6 +27,7 @@ under the License. pom flink-cdc-pipeline-connector-values + flink-cdc-pipeline-connector-mysql diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/ConnectSchemaTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/ConnectSchemaTypeInference.java new file mode 100644 index 000000000..f1eeb6235 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/ConnectSchemaTypeInference.java @@ -0,0 +1,67 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.debezium.event; + +import com.ververica.cdc.common.types.DataField; +import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DataTypes; +import org.apache.kafka.connect.data.Schema; + +/** Utility class to convert {@link Schema} to {@link DataType}. */ +public class ConnectSchemaTypeInference { + + public static DataType infer(Schema schema) { + return schema.isOptional() + ? infer(schema, schema.type()) + : infer(schema, schema.type()).notNull(); + } + + private static DataType infer(Schema schema, Schema.Type type) { + switch (type) { + case INT8: + return DataTypes.TINYINT(); + case INT16: + return DataTypes.SMALLINT(); + case INT32: + return DataTypes.INT(); + case INT64: + return DataTypes.BIGINT(); + case FLOAT32: + return DataTypes.FLOAT(); + case FLOAT64: + return DataTypes.DOUBLE(); + case BOOLEAN: + return DataTypes.BOOLEAN(); + case STRING: + return DataTypes.STRING(); + case BYTES: + return DataTypes.BYTES(); + case ARRAY: + return DataTypes.ARRAY(infer(schema.valueSchema())); + case MAP: + return DataTypes.MAP(infer(schema.keySchema()), infer(schema.valueSchema())); + case STRUCT: + return DataTypes.ROW( + schema.fields().stream() + .map(f -> DataTypes.FIELD(f.name(), infer(f.schema()))) + .toArray(DataField[]::new)); + default: + throw new UnsupportedOperationException( + "Unsupported type: " + schema.type().getName()); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java new file mode 100644 index 000000000..cdaa95614 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -0,0 +1,539 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.debezium.event; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.data.DecimalData; +import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.data.TimestampData; +import com.ververica.cdc.common.data.binary.BinaryStringData; +import com.ververica.cdc.common.event.DataChangeEvent; +import com.ververica.cdc.common.event.Event; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.types.DataField; +import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DecimalType; +import com.ververica.cdc.common.types.RowType; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.DebeziumChangelogMode; +import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter; +import com.ververica.cdc.debezium.utils.TemporalConversions; +import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import com.ververica.cdc.runtime.typeutils.EventTypeInfo; +import io.debezium.data.Envelope; +import io.debezium.data.SpecialValueDecimal; +import io.debezium.data.VariableScaleDecimal; +import io.debezium.time.MicroTime; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTime; +import io.debezium.time.NanoTimestamp; +import io.debezium.time.Timestamp; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Debezium event deserializer for {@link SourceRecord}. */ +@Internal +public abstract class DebeziumEventDeserializationSchema extends SourceRecordEventDeserializer + implements DebeziumDeserializationSchema { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(DebeziumEventDeserializationSchema.class); + + private static final Map CONVERTERS = + new ConcurrentHashMap<>(); + + /** Changelog Mode to use for encoding changes in Flink internal data structure. */ + protected final DebeziumChangelogMode changelogMode; + + /** The session time zone in database server. */ + protected final ZoneId serverTimeZone; + + public DebeziumEventDeserializationSchema( + DebeziumChangelogMode changelogMode, ZoneId serverTimeZone) { + this.changelogMode = changelogMode; + this.serverTimeZone = serverTimeZone; + } + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + deserialize(record).forEach(out::collect); + } + + @Override + public List deserializeDataChangeRecord(SourceRecord record) throws Exception { + Envelope.Operation op = Envelope.operationFor(record); + TableId tableId = getTableId(record); + + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + Map meta = getMetadata(record); + + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + RecordData after = extractAfterDataRecord(value, valueSchema); + return Collections.singletonList(DataChangeEvent.insertEvent(tableId, after, meta)); + } else if (op == Envelope.Operation.DELETE) { + RecordData before = extractBeforeDataRecord(value, valueSchema); + return Collections.singletonList(DataChangeEvent.deleteEvent(tableId, before, meta)); + } else if (op == Envelope.Operation.UPDATE) { + RecordData after = extractAfterDataRecord(value, valueSchema); + if (changelogMode == DebeziumChangelogMode.ALL) { + RecordData before = extractBeforeDataRecord(value, valueSchema); + return Collections.singletonList( + DataChangeEvent.updateEvent(tableId, before, after, meta)); + } + return Collections.singletonList( + DataChangeEvent.updateEvent(tableId, null, after, meta)); + } else { + LOG.trace("Received {} operation, skip", op); + return Collections.emptyList(); + } + } + + @Override + public TypeInformation getProducedType() { + return new EventTypeInfo(); + } + + private RecordData extractBeforeDataRecord(Struct value, Schema valueSchema) throws Exception { + Schema beforeSchema = fieldSchema(valueSchema, Envelope.FieldName.BEFORE); + Struct beforeValue = fieldStruct(value, Envelope.FieldName.BEFORE); + return extractDataRecord(beforeValue, beforeSchema); + } + + private RecordData extractAfterDataRecord(Struct value, Schema valueSchema) throws Exception { + Schema afterSchema = fieldSchema(valueSchema, Envelope.FieldName.AFTER); + Struct afterValue = fieldStruct(value, Envelope.FieldName.AFTER); + return extractDataRecord(afterValue, afterSchema); + } + + private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception { + DataType dataType = ConnectSchemaTypeInference.infer(valueSchema); + return (RecordData) + getOrCreateConverter(dataType, serverTimeZone).convert(value, valueSchema); + } + + private static DeserializationRuntimeConverter getOrCreateConverter( + DataType type, ZoneId serverTimeZone) { + return CONVERTERS.computeIfAbsent(type, t -> createConverter(t, serverTimeZone)); + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** Creates a runtime converter which is null safe. */ + private static DeserializationRuntimeConverter createConverter( + DataType type, ZoneId serverTimeZone) { + return wrapIntoNullableConverter(createNotNullConverter(type, serverTimeZone)); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). + // -------------------------------------------------------------------------------- + + /** Creates a runtime converter which assuming input object is not null. */ + public static DeserializationRuntimeConverter createNotNullConverter( + DataType type, ZoneId serverTimeZone) { + // if no matched user defined converter, fallback to the default converter + switch (type.getTypeRoot()) { + case BOOLEAN: + return convertToBoolean(); + case TINYINT: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return Byte.parseByte(dbzObj.toString()); + } + }; + case SMALLINT: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return Short.parseShort(dbzObj.toString()); + } + }; + case INTEGER: + return convertToInt(); + case BIGINT: + return convertToLong(); + case DATE: + return convertToDate(); + case TIME_WITHOUT_TIME_ZONE: + return convertToTime(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertToTimestamp(serverTimeZone); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToLocalTimeZoneTimestamp(serverTimeZone); + case FLOAT: + return convertToFloat(); + case DOUBLE: + return convertToDouble(); + case CHAR: + case VARCHAR: + return convertToString(); + case BINARY: + case VARBINARY: + return convertToBinary(); + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ROW: + return createRowConverter((RowType) type, serverTimeZone); + case ARRAY: + case MAP: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private static DeserializationRuntimeConverter convertToBoolean() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Boolean) { + return dbzObj; + } else if (dbzObj instanceof Byte) { + return (byte) dbzObj == 1; + } else if (dbzObj instanceof Short) { + return (short) dbzObj == 1; + } else { + return Boolean.parseBoolean(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToInt() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return dbzObj; + } else if (dbzObj instanceof Long) { + return ((Long) dbzObj).intValue(); + } else { + return Integer.parseInt(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToLong() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return ((Integer) dbzObj).longValue(); + } else if (dbzObj instanceof Long) { + return dbzObj; + } else { + return Long.parseLong(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToDouble() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return ((Float) dbzObj).doubleValue(); + } else if (dbzObj instanceof Double) { + return dbzObj; + } else { + return Double.parseDouble(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToFloat() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return dbzObj; + } else if (dbzObj instanceof Double) { + return ((Double) dbzObj).floatValue(); + } else { + return Float.parseFloat(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToDate() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay(); + } + }; + } + + private static DeserializationRuntimeConverter convertToTime() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case MicroTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000); + case NanoTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000_000); + } + } else if (dbzObj instanceof Integer) { + return dbzObj; + } + // get number of milliseconds of the day + return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000; + } + }; + } + + private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case Timestamp.SCHEMA_NAME: + return TimestampData.fromMillis((Long) dbzObj); + case MicroTimestamp.SCHEMA_NAME: + long micro = (long) dbzObj; + return TimestampData.fromMillis( + micro / 1000, (int) (micro % 1000 * 1000)); + case NanoTimestamp.SCHEMA_NAME: + long nano = (long) dbzObj; + return TimestampData.fromMillis( + nano / 1000_000, (int) (nano % 1000_000)); + } + } + LocalDateTime localDateTime = + TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); + return TimestampData.fromLocalDateTime(localDateTime); + } + }; + } + + private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp( + ZoneId serverTimeZone) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof String) { + String str = (String) dbzObj; + // TIMESTAMP_LTZ type is encoded in string type + Instant instant = Instant.parse(str); + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant(instant, serverTimeZone)); + } + throw new IllegalArgumentException( + "Unable to convert to TimestampData from unexpected value '" + + dbzObj + + "' of type " + + dbzObj.getClass().getName()); + } + }; + } + + private static DeserializationRuntimeConverter convertToString() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return BinaryStringData.fromString(dbzObj.toString()); + } + }; + } + + private static DeserializationRuntimeConverter convertToBinary() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof byte[]) { + return dbzObj; + } else if (dbzObj instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) dbzObj; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else { + throw new UnsupportedOperationException( + "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName()); + } + } + }; + } + + private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + BigDecimal bigDecimal; + if (dbzObj instanceof byte[]) { + // decimal.handling.mode=precise + bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj); + } else if (dbzObj instanceof String) { + // decimal.handling.mode=string + bigDecimal = new BigDecimal((String) dbzObj); + } else if (dbzObj instanceof Double) { + // decimal.handling.mode=double + bigDecimal = BigDecimal.valueOf((Double) dbzObj); + } else { + if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { + SpecialValueDecimal decimal = + VariableScaleDecimal.toLogical((Struct) dbzObj); + bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); + } else { + // fallback to string + bigDecimal = new BigDecimal(dbzObj.toString()); + } + } + return DecimalData.fromBigDecimal(bigDecimal, precision, scale); + } + }; + } + + private static DeserializationRuntimeConverter createRowConverter( + RowType rowType, ZoneId serverTimeZone) { + final DeserializationRuntimeConverter[] fieldConverters = + rowType.getFields().stream() + .map(DataField::getType) + .map(logicType -> createConverter(logicType, serverTimeZone)) + .toArray(DeserializationRuntimeConverter[]::new); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + final BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); + + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + Struct struct = (Struct) dbzObj; + int arity = fieldNames.length; + Object[] fields = new Object[arity]; + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + Field field = schema.field(fieldName); + if (field == null) { + fields[i] = null; + } else { + Object fieldValue = struct.getWithoutDefault(fieldName); + Schema fieldSchema = schema.field(fieldName).schema(); + Object convertedField = + convertField(fieldConverters[i], fieldValue, fieldSchema); + fields[i] = convertedField; + } + } + return generator.generate(fields); + } + }; + } + + private static Object convertField( + DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema) + throws Exception { + if (fieldValue == null) { + return null; + } else { + return fieldConverter.convert(fieldValue, fieldSchema); + } + } + + private static DeserializationRuntimeConverter wrapIntoNullableConverter( + DeserializationRuntimeConverter converter) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + if (dbzObj == null) { + return null; + } + return converter.convert(dbzObj, schema); + } + }; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SourceRecordEventDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SourceRecordEventDeserializer.java new file mode 100644 index 000000000..cb38fef1d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SourceRecordEventDeserializer.java @@ -0,0 +1,84 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.debezium.event; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.event.DataChangeEvent; +import com.ververica.cdc.common.event.Event; +import com.ververica.cdc.common.event.EventDeserializer; +import com.ververica.cdc.common.event.SchemaChangeEvent; +import com.ververica.cdc.common.event.TableId; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Deserializer to deserialize {@link SourceRecord} to {@link Event}. */ +@Internal +public abstract class SourceRecordEventDeserializer implements EventDeserializer { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(SourceRecordEventDeserializer.class); + + @Override + public List deserialize(SourceRecord record) throws Exception { + if (isDataChangeRecord(record)) { + LOG.trace("Process data change record: {}", record); + return deserializeDataChangeRecord(record); + } else if (isSchemaChangeRecord(record)) { + LOG.trace("Process schema change record: {}", record); + return deserializeSchemaChangeRecord(record); + } else { + LOG.trace("Ignored other record: {}", record); + return Collections.emptyList(); + } + } + + /** Whether the given record is a data change record. */ + protected abstract boolean isDataChangeRecord(SourceRecord record); + + /** Whether the given record is a schema change record. */ + protected abstract boolean isSchemaChangeRecord(SourceRecord record); + + /** Deserialize given data change record to {@link DataChangeEvent}. */ + protected abstract List deserializeDataChangeRecord(SourceRecord record) + throws Exception; + + /** Deserialize given schema change record to {@link SchemaChangeEvent}. */ + protected abstract List deserializeSchemaChangeRecord(SourceRecord record) + throws Exception; + + /** Get {@link TableId} from given record. */ + protected abstract TableId getTableId(SourceRecord record); + + /** Get metadata from given record. */ + protected abstract Map getMetadata(SourceRecord record); + + public static Schema fieldSchema(Schema schema, String fieldName) { + return schema.field(fieldName).schema(); + } + + public static Struct fieldStruct(Struct value, String fieldName) { + return value.getStruct(fieldName); + } +}