[cdc-pipeline-connector][mysql] Introduce mysql cdc pipeline DataSource

pull/2810/head
Jiabao Sun 1 year ago committed by Hang Ruan
parent a328f9895a
commit d0cce26b76

@ -20,6 +20,7 @@ import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.data.RecordData;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@ -82,7 +83,8 @@ public class DataChangeEvent implements ChangeEvent, Serializable {
/** Creates a {@link DataChangeEvent} instance that describes the insert event. */
public static DataChangeEvent insertEvent(TableId tableId, RecordData after) {
return new DataChangeEvent(tableId, null, after, OperationType.INSERT, null);
return new DataChangeEvent(
tableId, null, after, OperationType.INSERT, Collections.emptyMap());
}
/**
@ -95,7 +97,8 @@ public class DataChangeEvent implements ChangeEvent, Serializable {
/** Creates a {@link DataChangeEvent} instance that describes the delete event. */
public static DataChangeEvent deleteEvent(TableId tableId, RecordData before) {
return new DataChangeEvent(tableId, before, null, OperationType.DELETE, null);
return new DataChangeEvent(
tableId, before, null, OperationType.DELETE, Collections.emptyMap());
}
/**
@ -109,7 +112,8 @@ public class DataChangeEvent implements ChangeEvent, Serializable {
/** Creates a {@link DataChangeEvent} instance that describes the update event. */
public static DataChangeEvent updateEvent(
TableId tableId, RecordData before, RecordData after) {
return new DataChangeEvent(tableId, before, after, OperationType.UPDATE, null);
return new DataChangeEvent(
tableId, before, after, OperationType.UPDATE, Collections.emptyMap());
}
/**
@ -122,7 +126,8 @@ public class DataChangeEvent implements ChangeEvent, Serializable {
/** Creates a {@link DataChangeEvent} instance that describes the replace event. */
public static DataChangeEvent replaceEvent(TableId tableId, RecordData after) {
return new DataChangeEvent(tableId, null, after, OperationType.REPLACE, null);
return new DataChangeEvent(
tableId, null, after, OperationType.REPLACE, Collections.emptyMap());
}
/**

@ -0,0 +1,30 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.event;
import com.ververica.cdc.common.annotation.Public;
import java.io.Serializable;
import java.util.List;
/** Deserializer to deserialize given record to {@link Event}. */
@Public
public interface EventDeserializer<T> extends Serializable {
/** Deserialize given record to {@link Event}s. */
List<? extends Event> deserialize(T record) throws Exception;
}

@ -31,15 +31,39 @@ import java.util.List;
@PublicEvolving
public interface MetadataAccessor {
/** List all namespaces from external systems. */
List<String> listNamespaces() throws UnsupportedOperationException;
/**
* List all namespaces from external systems.
*
* @return The list of namespaces
* @throws UnsupportedOperationException Thrown, if the external system does not support
* namespace.
*/
List<String> listNamespaces();
/** List schemas by namespace from external systems. */
List<String> listSchemas(@Nullable String namespace) throws UnsupportedOperationException;
/**
* List all schemas from external systems.
*
* @param namespace The namespace to list schemas from. If null, list schemas from all
* namespaces.
* @return The list of schemas
* @throws UnsupportedOperationException Thrown, if the external system does not support schema.
*/
List<String> listSchemas(@Nullable String namespace);
/** List tables by namespace and schema from external systems. */
/**
* List tables by namespace and schema from external systems.
*
* @param namespace The namespace to list tables from. If null, list tables from all namespaces.
* @param schemaName The schema to list tables from. If null, list tables from all schemas.
* @return The list of {@link TableId}s.
*/
List<TableId> listTables(@Nullable String namespace, @Nullable String schemaName);
/** Get the schema of the given table. */
/**
* Get the {@link Schema} of the given table.
*
* @param tableId The {@link TableId} of the given table.
* @return The {@link Schema} of the table.
*/
Schema getTableSchema(TableId tableId);
}

@ -0,0 +1,153 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2023 Ververica Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-pipeline-connectors</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-cdc-pipeline-connector-mysql</artifactId>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
<include>io.debezium:debezium-api</include>
<include>io.debezium:debezium-embedded</include>
<include>io.debezium:debezium-core</include>
<include>io.debezium:debezium-ddl-parser</include>
<include>io.debezium:debezium-connector-mysql</include>
<include>com.ververica:flink-connector-debezium</include>
<include>com.ververica:flink-connector-mysql-cdc</include>
<include>org.antlr:antlr4-runtime</include>
<include>org.apache.kafka:*</include>
<include>mysql:mysql-connector-java</include>
<include>com.zendesk:mysql-binlog-connector-java</include>
<include>com.fasterxml.*:*</include>
<include>com.google.guava:*</include>
<include>com.esri.geometry:esri-geometry-api</include>
<include>com.zaxxer:HikariCP</include>
<!-- Include fixed version 30.1.1-jre-14.0 of flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.kafka:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
<exclude>LICENSE</exclude>
<!-- Does not contain anything relevant.
Cites a binary dependency on jersey, but this is neither reflected in the
dependency graph, nor are any jersey files bundled. -->
<exclude>NOTICE</exclude>
<exclude>common/**</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.org.apache.kafka
</shadedPattern>
</relocation>
<relocation>
<pattern>org.antlr</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.org.antlr
</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.com.fasterxml
</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.com.google
</shadedPattern>
</relocation>
<relocation>
<pattern>com.esri.geometry</pattern>
<shadedPattern>com.ververica.cdc.connectors.shaded.com.esri.geometry</shadedPattern>
</relocation>
<relocation>
<pattern>com.zaxxer</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.com.zaxxer
</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,356 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.factory;
import org.apache.flink.table.api.ValidationException;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.ConfigOption;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.factories.DataSourceFactory;
import com.ververica.cdc.common.factories.Factory;
import com.ververica.cdc.common.schema.Selectors;
import com.ververica.cdc.common.source.DataSource;
import com.ververica.cdc.connectors.mysql.source.MySqlDataSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import com.ververica.cdc.connectors.mysql.utils.OptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static com.ververica.cdc.debezium.utils.JdbcUrlUtils.getJdbcProperties;
import static org.apache.flink.util.Preconditions.checkState;
/** A {@link Factory} to create {@link MySqlDataSource}. */
@Internal
public class MySqlDataSourceFactory implements DataSourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(MySqlDataSourceFactory.class);
public static final String IDENTIFIER = "mysql";
@Override
public DataSource createDataSource(Context context) {
final Configuration config = context.getConfiguration();
String hostname = config.get(HOSTNAME);
int port = config.get(PORT);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String tables = config.get(TABLES);
String serverId = validateAndGetServerId(config);
ZoneId serverTimeZone = getServerTimeZone(config);
StartupOptions startupOptions = getStartupOptions(config);
boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
double distributionFactorUpper =
config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower =
config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean closeIdleReaders =
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
validateIntegerOption(
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
validateDistributionFactorUpper(distributionFactorUpper);
validateDistributionFactorLower(distributionFactorLower);
Map<String, String> configMap = config.toMap();
OptionUtils.printOptions(IDENTIFIER, config.toMap());
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(hostname)
.port(port)
.username(username)
.password(password)
.databaseList(".*")
.tableList(".*")
.startupOptions(startupOptions)
.serverId(serverId)
.serverTimeZone(serverTimeZone.getId())
.fetchSize(fetchSize)
.splitSize(splitSize)
.splitMetaGroupSize(splitMetaGroupSize)
.distributionFactorLower(distributionFactorLower)
.distributionFactorUpper(distributionFactorUpper)
.heartbeatInterval(heartbeatInterval)
.connectTimeout(connectTimeout)
.connectMaxRetries(connectMaxRetries)
.connectionPoolSize(connectionPoolSize)
.closeIdleReaders(closeIdleReaders)
.includeSchemaChanges(includeSchemaChanges)
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap));
Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
configFactory.tableList(getTableList(configFactory.createConfig(0), selectors));
return new MySqlDataSource(configFactory);
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(USERNAME);
options.add(PASSWORD);
options.add(TABLES);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
options.add(SERVER_TIME_ZONE);
options.add(SERVER_ID);
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS);
options.add(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
options.add(CHUNK_META_GROUP_SIZE);
options.add(SCAN_SNAPSHOT_FETCH_SIZE);
options.add(CONNECT_TIMEOUT);
options.add(CONNECTION_POOL_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(CONNECT_MAX_RETRIES);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(HEARTBEAT_INTERVAL);
options.add(SCHEMA_CHANGE_ENABLED);
return options;
}
@Override
public String identifier() {
return IDENTIFIER;
}
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
private static String[] getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) {
return MySqlSchemaUtils.listTables(sourceConfig, null).stream()
.filter(selectors::isMatch)
.map(TableId::toString)
.toArray(String[]::new);
}
private static StartupOptions getStartupOptions(Configuration config) {
String modeString = config.get(SCAN_STARTUP_MODE);
switch (modeString.toLowerCase()) {
case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
case SCAN_STARTUP_MODE_VALUE_EARLIEST:
return StartupOptions.earliest();
case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
validateSpecificOffset(config);
return getSpecificOffset(config);
case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
return StartupOptions.timestamp(
config.get(SCAN_STARTUP_TIMESTAMP_MILLIS));
default:
throw new ValidationException(
String.format(
"Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s], but was: %s",
SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_LATEST,
SCAN_STARTUP_MODE_VALUE_EARLIEST,
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
SCAN_STARTUP_MODE_VALUE_TIMESTAMP,
modeString));
}
}
private static void validateSpecificOffset(Configuration config) {
Optional<String> gtidSet =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
Optional<String> binlogFilename =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
Optional<Long> binlogPosition =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
if (!gtidSet.isPresent() && !(binlogFilename.isPresent() && binlogPosition.isPresent())) {
throw new ValidationException(
String.format(
"Unable to find a valid binlog offset. Either %s, or %s and %s are required.",
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key(),
SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key(),
SCAN_STARTUP_SPECIFIC_OFFSET_POS.key()));
}
}
private static StartupOptions getSpecificOffset(Configuration config) {
BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
// GTID set
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
.ifPresent(offsetBuilder::setGtidSet);
// Binlog file + pos
Optional<String> binlogFilename =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
Optional<Long> binlogPosition =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
if (binlogFilename.isPresent() && binlogPosition.isPresent()) {
offsetBuilder.setBinlogFilePosition(binlogFilename.get(), binlogPosition.get());
} else {
offsetBuilder.setBinlogFilePosition("", 0);
}
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
.ifPresent(offsetBuilder::setSkipEvents);
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
.ifPresent(offsetBuilder::setSkipRows);
return StartupOptions.specificOffset(offsetBuilder.build());
}
private String validateAndGetServerId(Configuration configuration) {
final String serverIdValue = configuration.get(SERVER_ID);
if (serverIdValue != null) {
// validation
try {
ServerIdRange.from(serverIdValue);
} catch (Exception e) {
throw new ValidationException(
String.format(
"The value of option 'server-id' is invalid: '%s'", serverIdValue),
e);
}
}
return serverIdValue;
}
/** Checks the value of given integer option is valid. */
private void validateIntegerOption(
ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
checkState(
optionValue > exclusiveMin,
String.format(
"The value of option '%s' must larger than %d, but is %d",
option.key(), exclusiveMin, optionValue));
}
/** Checks the value of given evenly distribution factor upper bound is valid. */
private void validateDistributionFactorUpper(double distributionFactorUpper) {
checkState(
doubleCompare(distributionFactorUpper, 1.0d) >= 0,
String.format(
"The value of option '%s' must larger than or equals %s, but is %s",
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
1.0d,
distributionFactorUpper));
}
/** Checks the value of given evenly distribution factor lower bound is valid. */
private void validateDistributionFactorLower(double distributionFactorLower) {
checkState(
doubleCompare(distributionFactorLower, 0.0d) >= 0
&& doubleCompare(distributionFactorLower, 1.0d) <= 0,
String.format(
"The value of option '%s' must between %s and %s inclusively, but is %s",
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
0.0d,
1.0d,
distributionFactorLower));
}
/** Replaces the default timezone placeholder with session timezone, if applicable. */
private static ZoneId getServerTimeZone(Configuration config) {
final String serverTimeZone = config.get(SERVER_TIME_ZONE);
if (serverTimeZone != null) {
return ZoneId.of(serverTimeZone);
} else {
LOG.warn(
"{} is not set, which might cause data inconsistencies for time-related fields.",
SERVER_TIME_ZONE.key());
return ZoneId.systemDefault();
}
}
}

@ -0,0 +1,56 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.source.DataSource;
import com.ververica.cdc.common.source.EventSourceProvider;
import com.ververica.cdc.common.source.FlinkSourceProvider;
import com.ververica.cdc.common.source.MetadataAccessor;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import java.time.ZoneId;
/** A {@link DataSource} for mysql cdc connector. */
@Internal
public class MySqlDataSource implements DataSource {
private final MySqlSourceConfigFactory configFactory;
private final MySqlSourceConfig sourceConfig;
public MySqlDataSource(MySqlSourceConfigFactory configFactory) {
this.configFactory = configFactory;
this.sourceConfig = configFactory.createConfig(0);
}
@Override
public EventSourceProvider getEventSourceProvider() {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
ZoneId.of(sourceConfig.getServerTimeZone()),
sourceConfig.isIncludeSchemaChanges());
return FlinkSourceProvider.of(new MySqlSource<>(configFactory, deserializer));
}
@Override
public MetadataAccessor getMetadataAccessor() {
return new MySqlMetadataAccessor(sourceConfig);
}
}

@ -0,0 +1,232 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source;
import com.ververica.cdc.common.annotation.Experimental;
import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.configuration.ConfigOption;
import com.ververica.cdc.common.configuration.ConfigOptions;
import java.time.Duration;
/** Configurations for {@link MySqlDataSource}. */
@PublicEvolving
public class MySqlDataSourceOptions {
public static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
.withDescription("IP address or hostname of the MySQL database server.");
public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(3306)
.withDescription("Integer port number of the MySQL database server.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription(
"Name of the MySQL database to use when connecting to the MySQL database server.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription(
"Password to use when connecting to the MySQL database server.");
public static final ConfigOption<String> TABLES =
ConfigOptions.key("tables")
.stringType()
.noDefaultValue()
.withDescription(
"Table names of the MySQL tables to monitor. Regular expressions are supported. "
+ "It is important to note that the dot (.) is treated as a delimiter for database and table names. "
+ "If there is a need to use a dot (.) in a regular expression to match any character, "
+ "it is necessary to escape the dot with a backslash."
+ "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*");
public static final ConfigOption<String> SERVER_TIME_ZONE =
ConfigOptions.key("server-time-zone")
.stringType()
.noDefaultValue()
.withDescription(
"The session time zone in database server. If not set, then "
+ "ZoneId.systemDefault() is used to determine the server time zone.");
public static final ConfigOption<String> SERVER_ID =
ConfigOptions.key("server-id")
.stringType()
.noDefaultValue()
.withDescription(
"A numeric ID or a numeric ID range of this database client, "
+ "The numeric ID syntax is like '5400', the numeric ID range syntax "
+ "is like '5400-5408', The numeric ID range syntax is recommended when "
+ "'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all "
+ "currently-running database processes in the MySQL cluster. This connector"
+ " joins the MySQL cluster as another server (with this unique ID) "
+ "so it can read the binlog. By default, a random number is generated between"
+ " 5400 and 6400, though we recommend setting an explicit value.");
public static final ConfigOption<Integer> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
ConfigOptions.key("scan.incremental.snapshot.chunk.size")
.intType()
.defaultValue(8096)
.withDescription(
"The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.");
public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
ConfigOptions.key("scan.snapshot.fetch.size")
.intType()
.defaultValue(1024)
.withDescription(
"The maximum fetch size for per poll when read table snapshot.");
public static final ConfigOption<Duration> CONNECT_TIMEOUT =
ConfigOptions.key("connect.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.");
public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
ConfigOptions.key("connection.pool.size")
.intType()
.defaultValue(20)
.withDescription("The connection pool size.");
public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
ConfigOptions.key("connect.max-retries")
.intType()
.defaultValue(3)
.withDescription(
"The max retry times that the connector should retry to build MySQL database server connection.");
public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.stringType()
.defaultValue("initial")
.withDescription(
"Optional startup mode for MySQL CDC consumer, valid enumerations are "
+ "\"initial\", \"earliest-offset\", \"latest-offset\", \"timestamp\"\n"
+ "or \"specific-offset\"");
public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE =
ConfigOptions.key("scan.startup.specific-offset.file")
.stringType()
.noDefaultValue()
.withDescription(
"Optional binlog file name used in case of \"specific-offset\" startup mode");
public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
ConfigOptions.key("scan.startup.specific-offset.pos")
.longType()
.noDefaultValue()
.withDescription(
"Optional binlog file position used in case of \"specific-offset\" startup mode");
public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =
ConfigOptions.key("scan.startup.specific-offset.gtid-set")
.stringType()
.noDefaultValue()
.withDescription(
"Optional GTID set used in case of \"specific-offset\" startup mode");
public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS =
ConfigOptions.key("scan.startup.specific-offset.skip-events")
.longType()
.noDefaultValue()
.withDescription(
"Optional number of events to skip after the specific starting offset");
public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS =
ConfigOptions.key("scan.startup.specific-offset.skip-rows")
.longType()
.noDefaultValue()
.withDescription("Optional number of rows to skip after the specific offset");
public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
ConfigOptions.key("scan.startup.timestamp-millis")
.longType()
.noDefaultValue()
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");
public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
ConfigOptions.key("heartbeat.interval")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"Optional interval of sending heartbeat event for tracing the latest available binlog offsets");
// ----------------------------------------------------------------------------
// experimental options, won't add them to documentation
// ----------------------------------------------------------------------------
@Experimental
public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
ConfigOptions.key("chunk-meta.group.size")
.intType()
.defaultValue(1000)
.withDescription(
"The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups.");
@Experimental
public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
.doubleType()
.defaultValue(1000.0d)
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
.withDescription(
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
@Experimental
public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
.doubleType()
.defaultValue(0.05d)
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
.withDescription(
"The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to close idle readers at the end of the snapshot phase. This feature depends on "
+ "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be "
+ "greater than or equal to 1.14 when enabling this feature.");
@Experimental
public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
ConfigOptions.key("schema-change.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether , by default is false.");
}

@ -0,0 +1,92 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.debezium.event.DebeziumEventDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/** Event deserializer for {@link MySqlDataSource}. */
@Internal
public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final long serialVersionUID = 1L;
public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.mysql.SchemaChangeKey";
private final boolean includeSchemaChanges;
public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
ZoneId serverTimeZone,
boolean includeSchemaChanges) {
super(changelogMode, serverTimeZone);
this.includeSchemaChanges = includeSchemaChanges;
}
@Override
protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
// TODO: support schema change event
return Collections.emptyList();
}
return Collections.emptyList();
}
@Override
protected boolean isDataChangeRecord(SourceRecord record) {
Schema valueSchema = record.valueSchema();
Struct value = (Struct) record.value();
return value != null
&& valueSchema != null
&& valueSchema.field(Envelope.FieldName.OPERATION) != null
&& value.getString(Envelope.FieldName.OPERATION) != null;
}
@Override
protected boolean isSchemaChangeRecord(SourceRecord record) {
Schema keySchema = record.keySchema();
return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
}
@Override
protected TableId getTableId(SourceRecord record) {
if (isDataChangeRecord(record)) {
String[] parts = record.topic().split("\\.");
return TableId.tableId(parts[1], parts[2]);
}
// TODO: get table id from schema change record
return null;
}
@Override
protected Map<String, String> getMetadata(SourceRecord record) {
return Collections.emptyMap();
}
}

@ -0,0 +1,86 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.source.MetadataAccessor;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import io.debezium.connector.mysql.MySqlPartition;
import javax.annotation.Nullable;
import java.util.List;
/** {@link MetadataAccessor} for {@link MySqlDataSource}. */
@Internal
public class MySqlMetadataAccessor implements MetadataAccessor {
private final MySqlSourceConfig sourceConfig;
private final MySqlPartition partition;
public MySqlMetadataAccessor(MySqlSourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
this.partition =
new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
}
/**
* Always throw {@link UnsupportedOperationException} because MySQL does not support namespace.
*/
@Override
public List<String> listNamespaces() {
throw new UnsupportedOperationException("List namespace is not supported by MySQL.");
}
/**
* List all database from MySQL.
*
* @param namespace This parameter is ignored because MySQL does not support namespace.
* @return The list of database
*/
@Override
public List<String> listSchemas(@Nullable String namespace) {
return MySqlSchemaUtils.listDatabases(sourceConfig);
}
/**
* List tables from MySQL.
*
* @param namespace This parameter is ignored because MySQL does not support namespace.
* @param dbName The database to list tables from. If null, list tables from all databases.
* @return The list of {@link TableId}s.
*/
@Override
public List<TableId> listTables(@Nullable String namespace, @Nullable String dbName) {
return MySqlSchemaUtils.listTables(sourceConfig, dbName);
}
/**
* Get the {@link Schema} of the given table.
*
* @param tableId The {@link TableId} of the given table.
* @return The {@link Schema} of the table.
*/
@Override
public Schema getTableSchema(TableId tableId) {
return MySqlSchemaUtils.getTableSchema(sourceConfig, partition, tableId);
}
}

@ -0,0 +1,158 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.utils;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Column;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote;
/** Utilities for converting from debezium {@link Table} types to {@link Schema}. */
public class MySqlSchemaUtils {
private static final Logger LOG = LoggerFactory.getLogger(MySqlSchemaUtils.class);
public static List<String> listDatabases(MySqlSourceConfig sourceConfig) {
try (JdbcConnection jdbc = createMySqlConnection(sourceConfig)) {
return listDatabases(jdbc);
} catch (SQLException e) {
throw new RuntimeException("Error to list databases: " + e.getMessage(), e);
}
}
public static List<TableId> listTables(
MySqlSourceConfig sourceConfig, @Nullable String dbName) {
try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) {
List<String> databases =
dbName != null ? Collections.singletonList(dbName) : listDatabases(jdbc);
List<TableId> tableIds = new ArrayList<>();
for (String database : databases) {
tableIds.addAll(listTables(jdbc, database));
}
return tableIds;
} catch (SQLException e) {
throw new RuntimeException("Error to list databases: " + e.getMessage(), e);
}
}
public static Schema getTableSchema(
MySqlSourceConfig sourceConfig, MySqlPartition partition, TableId tableId) {
try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) {
return getTableSchema(partition, tableId, sourceConfig, jdbc);
} catch (SQLException e) {
throw new RuntimeException("Error to get table schema: " + e.getMessage(), e);
}
}
public static List<String> listDatabases(JdbcConnection jdbc) throws SQLException {
// -------------------
// READ DATABASE NAMES
// -------------------
// Get the list of databases ...
LOG.info("Read list of available databases");
final List<String> databaseNames = new ArrayList<>();
jdbc.query(
"SHOW DATABASES WHERE `database` NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys')",
rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
}
});
LOG.info("\t list of available databases are: {}", databaseNames);
return databaseNames;
}
public static List<TableId> listTables(JdbcConnection jdbc, String dbName) throws SQLException {
// ----------------
// READ TABLE NAMES
// ----------------
// Get the list of table IDs for each database. We can't use a prepared statement with
// MySQL, so we have to build the SQL statement each time. Although in other cases this
// might lead to SQL injection, in our case we are reading the database names from the
// database and not taking them from the user ...
LOG.info("Read list of available tables in {}", dbName);
final List<TableId> tableIds = new ArrayList<>();
jdbc.query(
"SHOW FULL TABLES IN " + quote(dbName) + " where Table_Type = 'BASE TABLE'",
rs -> {
while (rs.next()) {
tableIds.add(TableId.tableId(dbName, rs.getString(1)));
}
});
LOG.info("\t list of available tables are: {}", tableIds);
return tableIds;
}
public static Schema getTableSchema(
MySqlPartition partition,
TableId tableId,
MySqlSourceConfig sourceConfig,
MySqlConnection jdbc) {
// fetch table schemas
try (MySqlSchema mySqlSchema =
new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) {
TableChanges.TableChange tableSchema =
mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId));
return toSchema(tableSchema.getTable());
}
}
public static Schema toSchema(Table table) {
List<Column> columns =
table.columns().stream()
.map(MySqlSchemaUtils::toColumn)
.collect(Collectors.toList());
return Schema.newBuilder()
.setColumns(columns)
.primaryKey(table.primaryKeyColumnNames())
.comment(table.comment())
.build();
}
public static Column toColumn(io.debezium.relational.Column column) {
return Column.physicalColumn(
column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment());
}
public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
return new io.debezium.relational.TableId(
tableId.getSchemaName(), null, tableId.getTableName());
}
private MySqlSchemaUtils() {}
}

@ -0,0 +1,186 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.utils;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import io.debezium.relational.Column;
/** Utilities for converting from MySQL types to {@link DataType}s. */
public class MySqlTypeUtils {
// ------ MySQL Type ------
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
private static final String BIT = "BIT";
private static final String TINYINT = "TINYINT";
private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL";
private static final String SMALLINT = "SMALLINT";
private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT UNSIGNED ZEROFILL";
private static final String MEDIUMINT = "MEDIUMINT";
private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT UNSIGNED ZEROFILL";
private static final String INT = "INT";
private static final String INT_UNSIGNED = "INT UNSIGNED";
private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL";
private static final String BIGINT = "BIGINT";
private static final String SERIAL = "SERIAL";
private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED ZEROFILL";
private static final String REAL = "REAL";
private static final String REAL_UNSIGNED = "REAL UNSIGNED";
private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED ZEROFILL";
private static final String FLOAT = "FLOAT";
private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED ZEROFILL";
private static final String DOUBLE = "DOUBLE";
private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED ZEROFILL";
private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION UNSIGNED";
private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
"DOUBLE PRECISION UNSIGNED ZEROFILL";
private static final String NUMERIC = "NUMERIC";
private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED ZEROFILL";
private static final String FIXED = "FIXED";
private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED ZEROFILL";
private static final String DECIMAL = "DECIMAL";
private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED ZEROFILL";
private static final String CHAR = "CHAR";
private static final String VARCHAR = "VARCHAR";
private static final String TINYTEXT = "TINYTEXT";
private static final String MEDIUMTEXT = "MEDIUMTEXT";
private static final String TEXT = "TEXT";
private static final String LONGTEXT = "LONGTEXT";
private static final String DATE = "DATE";
private static final String TIME = "TIME";
private static final String DATETIME = "DATETIME";
private static final String TIMESTAMP = "TIMESTAMP";
private static final String YEAR = "YEAR";
private static final String BINARY = "BINARY";
private static final String VARBINARY = "VARBINARY";
private static final String TINYBLOB = "TINYBLOB";
private static final String MEDIUMBLOB = "MEDIUMBLOB";
private static final String BLOB = "BLOB";
private static final String LONGBLOB = "LONGBLOB";
private static final String JSON = "JSON";
private static final String SET = "SET";
private static final String ENUM = "ENUM";
private static final String GEOMETRY = "GEOMETRY";
private static final String UNKNOWN = "UNKNOWN";
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
DataType dataType = convertFromColumn(column);
if (column.isOptional()) {
return dataType;
} else {
return dataType.notNull();
}
}
/**
* Returns a corresponding Flink data type from a debezium {@link Column} with nullable always
* be true.
*/
private static DataType convertFromColumn(Column column) {
String typeName = column.typeName();
switch (typeName) {
case TINYINT:
// MySQL haven't boolean type, it uses tinyint(1) to represents boolean type
// user should not use tinyint(1) to store number although jdbc url parameter
// tinyInt1isBit=false can help change the return value, it's not a general way
// btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default
return column.length() == 1 ? DataTypes.BOOLEAN() : DataTypes.TINYINT();
case TINYINT_UNSIGNED:
case TINYINT_UNSIGNED_ZEROFILL:
case SMALLINT:
return DataTypes.SMALLINT();
case SMALLINT_UNSIGNED:
case SMALLINT_UNSIGNED_ZEROFILL:
case INT:
case MEDIUMINT:
return DataTypes.INT();
case INT_UNSIGNED:
case INT_UNSIGNED_ZEROFILL:
case MEDIUMINT_UNSIGNED:
case MEDIUMINT_UNSIGNED_ZEROFILL:
case BIGINT:
return DataTypes.BIGINT();
case BIGINT_UNSIGNED:
case BIGINT_UNSIGNED_ZEROFILL:
case SERIAL:
return DataTypes.DECIMAL(20, 0);
case FLOAT:
case FLOAT_UNSIGNED:
case FLOAT_UNSIGNED_ZEROFILL:
return DataTypes.FLOAT();
case REAL:
case REAL_UNSIGNED:
case REAL_UNSIGNED_ZEROFILL:
case DOUBLE:
case DOUBLE_UNSIGNED:
case DOUBLE_UNSIGNED_ZEROFILL:
case DOUBLE_PRECISION:
case DOUBLE_PRECISION_UNSIGNED:
case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
return DataTypes.DOUBLE();
case NUMERIC:
case NUMERIC_UNSIGNED:
case NUMERIC_UNSIGNED_ZEROFILL:
case FIXED:
case FIXED_UNSIGNED:
case FIXED_UNSIGNED_ZEROFILL:
case DECIMAL:
case DECIMAL_UNSIGNED:
case DECIMAL_UNSIGNED_ZEROFILL:
return column.length() <= 38
? DataTypes.DECIMAL(column.length(), column.scale().orElse(0))
: DataTypes.STRING();
case TIME:
return column.length() >= 0 ? DataTypes.TIME(column.length()) : DataTypes.TIME();
case DATE:
return DataTypes.DATE();
case DATETIME:
case TIMESTAMP:
return column.length() >= 0
? DataTypes.TIMESTAMP_LTZ(column.length())
: DataTypes.TIMESTAMP_LTZ();
case CHAR:
return DataTypes.CHAR(column.length());
case VARCHAR:
return DataTypes.VARCHAR(column.length());
case TEXT:
return DataTypes.STRING();
case BINARY:
return DataTypes.BINARY(column.length());
case VARBINARY:
return DataTypes.VARBINARY(column.length());
case BLOB:
return DataTypes.BYTES();
default:
throw new UnsupportedOperationException(
String.format("Don't support MySQL type '%s' yet.", typeName));
}
}
private MySqlTypeUtils() {}
}

@ -0,0 +1,15 @@
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.ververica.cdc.connectors.mysql.factory.MySqlDataSourceFactory

@ -0,0 +1,238 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
import com.ververica.cdc.common.data.binary.BinaryRecordData;
import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import io.debezium.jdbc.JdbcConnection;
import org.junit.After;
import org.junit.Test;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link MySqlSource} with {@link MySqlEventDeserializer}. */
public class MySqlEventSourceITCase extends MySqlSourceTestBase {
private final BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(
RowType.of(
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()));
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@After
public void clear() {
customerDatabase.dropDatabase();
}
@Test
public void testProduceDataChangeEvents() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(5000L);
env.setRestartStrategy(RestartStrategies.noRestart());
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
MySqlSource<Event> eventSource = getMySqlSource(sourceConfig);
DataStreamSource<Event> source =
env.fromSource(eventSource, WatermarkStrategy.noWatermarks(), "Event-Source");
CloseableIterator<Event> iterator = source.collectAsync();
env.executeAsync();
TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), "customers");
List<BinaryRecordData> snapshotRecords =
customerRecordData(
Arrays.asList(
new Object[] {101, "user_1", "Shanghai", "123567891234"},
new Object[] {102, "user_2", "Shanghai", "123567891234"},
new Object[] {103, "user_3", "Shanghai", "123567891234"},
new Object[] {109, "user_4", "Shanghai", "123567891234"},
new Object[] {110, "user_5", "Shanghai", "123567891234"},
new Object[] {111, "user_6", "Shanghai", "123567891234"},
new Object[] {118, "user_7", "Shanghai", "123567891234"},
new Object[] {121, "user_8", "Shanghai", "123567891234"},
new Object[] {123, "user_9", "Shanghai", "123567891234"},
new Object[] {1009, "user_10", "Shanghai", "123567891234"},
new Object[] {1010, "user_11", "Shanghai", "123567891234"},
new Object[] {1011, "user_12", "Shanghai", "123567891234"},
new Object[] {1012, "user_13", "Shanghai", "123567891234"},
new Object[] {1013, "user_14", "Shanghai", "123567891234"},
new Object[] {1014, "user_15", "Shanghai", "123567891234"},
new Object[] {1015, "user_16", "Shanghai", "123567891234"},
new Object[] {1016, "user_17", "Shanghai", "123567891234"},
new Object[] {1017, "user_18", "Shanghai", "123567891234"},
new Object[] {1018, "user_19", "Shanghai", "123567891234"},
new Object[] {1019, "user_20", "Shanghai", "123567891234"},
new Object[] {2000, "user_21", "Shanghai", "123567891234"}));
List<DataChangeEvent> expectedSnapshotResults =
snapshotRecords.stream()
.map(
record ->
DataChangeEvent.insertEvent(
tableId, record, Collections.emptyMap()))
.collect(Collectors.toList());
List<Event> snapshotResults = fetchResults(iterator, expectedSnapshotResults.size());
assertThat(snapshotResults).containsExactlyInAnyOrderElementsOf(expectedSnapshotResults);
try (JdbcConnection connection = DebeziumUtils.createMySqlConnection(sourceConfig)) {
connection.setAutoCommit(false);
connection.execute(
"INSERT INTO "
+ tableId
+ " VALUES(2001, 'user_22','Shanghai','123567891234'),"
+ " (2002, 'user_23','Shanghai','123567891234'),"
+ "(2003, 'user_24','Shanghai','123567891234')");
connection.execute("DELETE FROM " + tableId + " where id = 101");
connection.execute("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010");
connection.commit();
}
List<Event> expectedStreamRecords = new ArrayList<>();
List<BinaryRecordData> insertedRecords =
customerRecordData(
Arrays.asList(
new Object[] {2001, "user_22", "Shanghai", "123567891234"},
new Object[] {2002, "user_23", "Shanghai", "123567891234"},
new Object[] {2003, "user_24", "Shanghai", "123567891234"}));
expectedStreamRecords.addAll(
insertedRecords.stream()
.map(
record ->
DataChangeEvent.insertEvent(
tableId, record, Collections.emptyMap()))
.collect(Collectors.toList()));
BinaryRecordData deletedRecord =
customerRecordData(new Object[] {101, "user_1", "Shanghai", "123567891234"});
expectedStreamRecords.add(
DataChangeEvent.deleteEvent(tableId, deletedRecord, Collections.emptyMap()));
BinaryRecordData updateBeforeRecord =
customerRecordData(new Object[] {1010, "user_11", "Shanghai", "123567891234"});
BinaryRecordData updateAfterRecord =
customerRecordData(new Object[] {1010, "user_11", "Hangzhou", "123567891234"});
expectedStreamRecords.add(
DataChangeEvent.updateEvent(
tableId, updateBeforeRecord, updateAfterRecord, Collections.emptyMap()));
List<Event> streamResults = fetchResults(iterator, expectedStreamRecords.size());
assertThat(streamResults).isEqualTo(expectedStreamRecords);
}
private MySqlSource<Event> getMySqlSource(MySqlSourceConfig sourceConfig) {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
ZoneId.of(sourceConfig.getServerTimeZone()),
sourceConfig.isIncludeSchemaChanges());
return MySqlSource.<Event>builder()
.hostname(sourceConfig.getHostname())
.port(sourceConfig.getPort())
.databaseList(sourceConfig.getDatabaseList().toArray(new String[0]))
.tableList(sourceConfig.getTableList().toArray(new String[0]))
.username(sourceConfig.getUsername())
.password(sourceConfig.getPassword())
.deserializer(deserializer)
.serverTimeZone(sourceConfig.getServerTimeZone())
.debeziumProperties(sourceConfig.getDbzProperties())
.build();
}
private MySqlSourceConfig getConfig(String[] captureTables) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.toArray(String[]::new);
return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.latest())
.databaseList(customerDatabase.getDatabaseName())
.tableList(captureTableIds)
.includeSchemaChanges(false)
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.splitSize(10)
.fetchSize(2)
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
.createConfig(0);
}
private List<BinaryRecordData> customerRecordData(List<Object[]> records) {
return records.stream().map(this::customerRecordData).collect(Collectors.toList());
}
private BinaryRecordData customerRecordData(Object[] record) {
return generator.generate(
new Object[] {
record[0],
BinaryStringData.fromString((String) record[1]),
BinaryStringData.fromString((String) record[2]),
BinaryStringData.fromString((String) record[3])
});
}
private static List<Event> fetchResults(Iterator<Event> iter, int size) {
List<Event> result = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Event event = iter.next();
result.add(event);
size--;
}
return result;
}
}

@ -0,0 +1,326 @@
-- Copyright 2023 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: customer
-- ----------------------------------------------------------------------------------------------------------------
-- Create and populate our users using a single insert with many rows
CREATE TABLE customers (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234"),
(110,"user_5","Shanghai","123567891234"),
(111,"user_6","Shanghai","123567891234"),
(118,"user_7","Shanghai","123567891234"),
(121,"user_8","Shanghai","123567891234"),
(123,"user_9","Shanghai","123567891234"),
(1009,"user_10","Shanghai","123567891234"),
(1010,"user_11","Shanghai","123567891234"),
(1011,"user_12","Shanghai","123567891234"),
(1012,"user_13","Shanghai","123567891234"),
(1013,"user_14","Shanghai","123567891234"),
(1014,"user_15","Shanghai","123567891234"),
(1015,"user_16","Shanghai","123567891234"),
(1016,"user_17","Shanghai","123567891234"),
(1017,"user_18","Shanghai","123567891234"),
(1018,"user_19","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");
-- Create a table will not be read
CREATE TABLE prefix_customers (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO prefix_customers
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234");
-- table has same name prefix with 'customers.*'
CREATE TABLE customers_1 (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_1
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234"),
(110,"user_5","Shanghai","123567891234"),
(111,"user_6","Shanghai","123567891234"),
(118,"user_7","Shanghai","123567891234"),
(121,"user_8","Shanghai","123567891234"),
(123,"user_9","Shanghai","123567891234"),
(1009,"user_10","Shanghai","123567891234"),
(1010,"user_11","Shanghai","123567891234"),
(1011,"user_12","Shanghai","123567891234"),
(1012,"user_13","Shanghai","123567891234"),
(1013,"user_14","Shanghai","123567891234"),
(1014,"user_15","Shanghai","123567891234"),
(1015,"user_16","Shanghai","123567891234"),
(1016,"user_17","Shanghai","123567891234"),
(1017,"user_18","Shanghai","123567891234"),
(1018,"user_19","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");
-- create table whose split key is evenly distributed
CREATE TABLE customers_even_dist (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_even_dist
VALUES (101,'user_1','Shanghai','123567891234'),
(102,'user_2','Shanghai','123567891234'),
(103,'user_3','Shanghai','123567891234'),
(104,'user_4','Shanghai','123567891234'),
(105,'user_5','Shanghai','123567891234'),
(106,'user_6','Shanghai','123567891234'),
(107,'user_7','Shanghai','123567891234'),
(108,'user_8','Shanghai','123567891234'),
(109,'user_9','Shanghai','123567891234'),
(110,'user_10','Shanghai','123567891234');
-- create table whose split key is evenly distributed and sparse
CREATE TABLE customers_sparse_dist (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_sparse_dist
VALUES (2,'user_1','Shanghai','123567891234'),
(4,'user_2','Shanghai','123567891234'),
(6,'user_3','Shanghai','123567891234'),
(8,'user_4','Shanghai','123567891234'),
(10,'user_5','Shanghai','123567891234'),
(16,'user_6','Shanghai','123567891234'),
(17,'user_7','Shanghai','123567891234'),
(18,'user_8','Shanghai','123567891234'),
(20,'user_9','Shanghai','123567891234'),
(22,'user_10','Shanghai','123567891234');
-- create table whose split key is evenly distributed and dense
CREATE TABLE customers_dense_dist (
id1 INTEGER NOT NULL,
id2 VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512),
PRIMARY KEY(id1, id2)
);
INSERT INTO customers_dense_dist
VALUES (1,'user_1','Shanghai','123567891234'),
(1,'user_2','Shanghai','123567891234'),
(1,'user_3','Shanghai','123567891234'),
(1,'user_4','Shanghai','123567891234'),
(2,'user_5','Shanghai','123567891234'),
(2,'user_6','Shanghai','123567891234'),
(2,'user_7','Shanghai','123567891234'),
(3,'user_8','Shanghai','123567891234'),
(3,'user_9','Shanghai','123567891234'),
(3,'user_10','Shanghai','123567891234');
CREATE TABLE customers_no_pk (
id INTEGER,
name VARCHAR(255) DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_no_pk
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234"),
(110,"user_5","Shanghai","123567891234"),
(111,"user_6","Shanghai","123567891234"),
(118,"user_7","Shanghai","123567891234"),
(121,"user_8","Shanghai","123567891234"),
(123,"user_9","Shanghai","123567891234"),
(1009,"user_10","Shanghai","123567891234"),
(1010,"user_11","Shanghai","123567891234"),
(1011,"user_12","Shanghai","123567891234"),
(1012,"user_13","Shanghai","123567891234"),
(1013,"user_14","Shanghai","123567891234"),
(1014,"user_15","Shanghai","123567891234"),
(1015,"user_16","Shanghai","123567891234"),
(1016,"user_17","Shanghai","123567891234"),
(1017,"user_18","Shanghai","123567891234"),
(1018,"user_19","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");
-- table has combined primary key
CREATE TABLE customer_card (
card_no BIGINT NOT NULL,
level VARCHAR(10) NOT NULL,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
note VARCHAR(1024),
PRIMARY KEY(card_no, level)
);
insert into customer_card
VALUES (20001, 'LEVEL_4', 'user_1', 'user with level 4'),
(20002, 'LEVEL_4', 'user_2', 'user with level 4'),
(20003, 'LEVEL_4', 'user_3', 'user with level 4'),
(20004, 'LEVEL_4', 'user_4', 'user with level 4'),
(20004, 'LEVEL_1', 'user_4', 'user with level 4'),
(20004, 'LEVEL_2', 'user_4', 'user with level 4'),
(20004, 'LEVEL_3', 'user_4', 'user with level 4'),
(30006, 'LEVEL_3', 'user_5', 'user with level 3'),
(30007, 'LEVEL_3', 'user_6', 'user with level 3'),
(30008, 'LEVEL_3', 'user_7', 'user with level 3'),
(30009, 'LEVEL_3', 'user_8', 'user with level 3'),
(30009, 'LEVEL_2', 'user_8', 'user with level 3'),
(30009, 'LEVEL_1', 'user_8', 'user with level 3'),
(40001, 'LEVEL_2', 'user_9', 'user with level 2'),
(40002, 'LEVEL_2', 'user_10', 'user with level 2'),
(40003, 'LEVEL_2', 'user_11', 'user with level 2'),
(50001, 'LEVEL_1', 'user_12', 'user with level 1'),
(50002, 'LEVEL_1', 'user_13', 'user with level 1'),
(50003, 'LEVEL_1', 'user_14', 'user with level 1');
-- table has single line
CREATE TABLE customer_card_single_line (
card_no BIGINT NOT NULL,
level VARCHAR(10) NOT NULL,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
note VARCHAR(1024),
PRIMARY KEY(card_no, level)
);
insert into customer_card_single_line
VALUES (20001, 'LEVEL_1', 'user_1', 'user with level 1');
-- table has combined primary key
CREATE TABLE shopping_cart (
product_no INT NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL,
PRIMARY KEY(user_id, product_no, product_kind)
);
insert into shopping_cart
VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'),
(101, 'KIND_002', 'user_1', 'my shopping cart'),
(102, 'KIND_007', 'user_1', 'my shopping cart'),
(102, 'KIND_008', 'user_1', 'my shopping cart'),
(501, 'KIND_100', 'user_2', 'my shopping list'),
(701, 'KIND_999', 'user_3', 'my shopping list'),
(801, 'KIND_010', 'user_4', 'my shopping list'),
(600, 'KIND_009', 'user_4', 'my shopping list'),
(401, 'KIND_002', 'user_5', 'leo list'),
(401, 'KIND_007', 'user_5', 'leo list'),
(404, 'KIND_008', 'user_5', 'leo list'),
(600, 'KIND_009', 'user_6', 'my shopping cart');
-- table has combined primary key and one of the primary key is evenly
CREATE TABLE evenly_shopping_cart (
product_no INT NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL,
PRIMARY KEY(product_kind, product_no, user_id)
);
insert into evenly_shopping_cart
VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'),
(102, 'KIND_002', 'user_1', 'my shopping cart'),
(103, 'KIND_007', 'user_1', 'my shopping cart'),
(104, 'KIND_008', 'user_1', 'my shopping cart'),
(105, 'KIND_100', 'user_2', 'my shopping list'),
(105, 'KIND_999', 'user_3', 'my shopping list'),
(107, 'KIND_010', 'user_4', 'my shopping list'),
(108, 'KIND_009', 'user_4', 'my shopping list'),
(109, 'KIND_002', 'user_5', 'leo list'),
(111, 'KIND_007', 'user_5', 'leo list'),
(111, 'KIND_008', 'user_5', 'leo list'),
(112, 'KIND_009', 'user_6', 'my shopping cart');
-- table has bigint unsigned auto increment primary key
CREATE TABLE shopping_cart_big (
product_no BIGINT UNSIGNED AUTO_INCREMENT NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL,
PRIMARY KEY(product_no)
);
insert into shopping_cart_big
VALUES (default, 'KIND_001', 'user_1', 'my shopping cart'),
(default, 'KIND_002', 'user_1', 'my shopping cart'),
(default, 'KIND_003', 'user_1', 'my shopping cart');
-- table has decimal primary key
CREATE TABLE shopping_cart_dec (
product_no DECIMAL(10, 4) NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) DEFAULT 'flink',
PRIMARY KEY(product_no)
);
insert into shopping_cart_dec
VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'),
(123457.456, 'KIND_002', 'user_2', 'my shopping cart'),
(123458.6789, 'KIND_003', 'user_3', 'my shopping cart'),
(123459.1234, 'KIND_004', 'user_4', null);
-- create table whose primary key are produced by snowflake algorithm
CREATE TABLE address (
id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
country VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
detail_address VARCHAR(1024)
);
INSERT INTO address
VALUES (416874195632735147, 'China', 'Beijing', 'West Town address 1'),
(416927583791428523, 'China', 'Beijing', 'West Town address 2'),
(417022095255614379, 'China', 'Beijing', 'West Town address 3'),
(417111867899200427, 'America', 'New York', 'East Town address 1'),
(417271541558096811, 'America', 'New York', 'East Town address 2'),
(417272886855938987, 'America', 'New York', 'East Town address 3'),
(417420106184475563, 'Germany', 'Berlin', 'West Town address 1'),
(418161258277847979, 'Germany', 'Berlin', 'West Town address 2');
CREATE TABLE default_value_test (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number INTEGER DEFAULT ' 123 '
);
INSERT INTO default_value_test
VALUES (1,'user1','Shanghai',123567),
(2,'user2','Shanghai',123567);

@ -0,0 +1,64 @@
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
secure-file-priv=/var/lib/mysql
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
#log-error=/var/log/mysqld.log
#pid-file=/var/run/mysqld/mysqld.pid
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems.
server-id = 223344
log_bin = mysql-bin
binlog_format = row
# Make binlog_expire_logs_seconds = 1 and max_binlog_size = 4096 to test the exception
# message when the binlog expires in the server.
binlog_expire_logs_seconds = 1
max_binlog_size = 4096
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on

@ -0,0 +1,61 @@
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
secure-file-priv=/var/lib/mysql
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
#log-error=/var/log/mysqld.log
#pid-file=/var/run/mysqld/mysqld.pid
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on

@ -0,0 +1,28 @@
-- Copyright 2023 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
-- However, in this database we'll grant 2 users different privileges:
--
-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
-- 2) 'mysqluser' - all privileges
--
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: emptydb
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE emptydb;

@ -0,0 +1,26 @@
################################################################################
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n

@ -0,0 +1,29 @@
################################################################################
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: root
password: root
tables: test.\.*
sink:
type: values
name: Values Sink
pipeline:
pipeline.global.parallelism: 1

@ -1,9 +1,8 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#

@ -27,6 +27,7 @@ under the License.
<packaging>pom</packaging>
<modules>
<module>flink-cdc-pipeline-connector-values</module>
<module>flink-cdc-pipeline-connector-mysql</module>
</modules>
<dependencies>

@ -0,0 +1,67 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium.event;
import com.ververica.cdc.common.types.DataField;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import org.apache.kafka.connect.data.Schema;
/** Utility class to convert {@link Schema} to {@link DataType}. */
public class ConnectSchemaTypeInference {
public static DataType infer(Schema schema) {
return schema.isOptional()
? infer(schema, schema.type())
: infer(schema, schema.type()).notNull();
}
private static DataType infer(Schema schema, Schema.Type type) {
switch (type) {
case INT8:
return DataTypes.TINYINT();
case INT16:
return DataTypes.SMALLINT();
case INT32:
return DataTypes.INT();
case INT64:
return DataTypes.BIGINT();
case FLOAT32:
return DataTypes.FLOAT();
case FLOAT64:
return DataTypes.DOUBLE();
case BOOLEAN:
return DataTypes.BOOLEAN();
case STRING:
return DataTypes.STRING();
case BYTES:
return DataTypes.BYTES();
case ARRAY:
return DataTypes.ARRAY(infer(schema.valueSchema()));
case MAP:
return DataTypes.MAP(infer(schema.keySchema()), infer(schema.valueSchema()));
case STRUCT:
return DataTypes.ROW(
schema.fields().stream()
.map(f -> DataTypes.FIELD(f.name(), infer(f.schema())))
.toArray(DataField[]::new));
default:
throw new UnsupportedOperationException(
"Unsupported type: " + schema.type().getName());
}
}
}

@ -0,0 +1,539 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium.event;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.DecimalData;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.data.TimestampData;
import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.types.DataField;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DecimalType;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import com.ververica.cdc.runtime.typeutils.EventTypeInfo;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** Debezium event deserializer for {@link SourceRecord}. */
@Internal
public abstract class DebeziumEventDeserializationSchema extends SourceRecordEventDeserializer
implements DebeziumDeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(DebeziumEventDeserializationSchema.class);
private static final Map<DataType, DeserializationRuntimeConverter> CONVERTERS =
new ConcurrentHashMap<>();
/** Changelog Mode to use for encoding changes in Flink internal data structure. */
protected final DebeziumChangelogMode changelogMode;
/** The session time zone in database server. */
protected final ZoneId serverTimeZone;
public DebeziumEventDeserializationSchema(
DebeziumChangelogMode changelogMode, ZoneId serverTimeZone) {
this.changelogMode = changelogMode;
this.serverTimeZone = serverTimeZone;
}
@Override
public void deserialize(SourceRecord record, Collector<Event> out) throws Exception {
deserialize(record).forEach(out::collect);
}
@Override
public List<DataChangeEvent> deserializeDataChangeRecord(SourceRecord record) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
TableId tableId = getTableId(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
Map<String, String> meta = getMetadata(record);
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
RecordData after = extractAfterDataRecord(value, valueSchema);
return Collections.singletonList(DataChangeEvent.insertEvent(tableId, after, meta));
} else if (op == Envelope.Operation.DELETE) {
RecordData before = extractBeforeDataRecord(value, valueSchema);
return Collections.singletonList(DataChangeEvent.deleteEvent(tableId, before, meta));
} else if (op == Envelope.Operation.UPDATE) {
RecordData after = extractAfterDataRecord(value, valueSchema);
if (changelogMode == DebeziumChangelogMode.ALL) {
RecordData before = extractBeforeDataRecord(value, valueSchema);
return Collections.singletonList(
DataChangeEvent.updateEvent(tableId, before, after, meta));
}
return Collections.singletonList(
DataChangeEvent.updateEvent(tableId, null, after, meta));
} else {
LOG.trace("Received {} operation, skip", op);
return Collections.emptyList();
}
}
@Override
public TypeInformation<Event> getProducedType() {
return new EventTypeInfo();
}
private RecordData extractBeforeDataRecord(Struct value, Schema valueSchema) throws Exception {
Schema beforeSchema = fieldSchema(valueSchema, Envelope.FieldName.BEFORE);
Struct beforeValue = fieldStruct(value, Envelope.FieldName.BEFORE);
return extractDataRecord(beforeValue, beforeSchema);
}
private RecordData extractAfterDataRecord(Struct value, Schema valueSchema) throws Exception {
Schema afterSchema = fieldSchema(valueSchema, Envelope.FieldName.AFTER);
Struct afterValue = fieldStruct(value, Envelope.FieldName.AFTER);
return extractDataRecord(afterValue, afterSchema);
}
private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception {
DataType dataType = ConnectSchemaTypeInference.infer(valueSchema);
return (RecordData)
getOrCreateConverter(dataType, serverTimeZone).convert(value, valueSchema);
}
private static DeserializationRuntimeConverter getOrCreateConverter(
DataType type, ZoneId serverTimeZone) {
return CONVERTERS.computeIfAbsent(type, t -> createConverter(t, serverTimeZone));
}
// -------------------------------------------------------------------------------------
// Runtime Converters
// -------------------------------------------------------------------------------------
/** Creates a runtime converter which is null safe. */
private static DeserializationRuntimeConverter createConverter(
DataType type, ZoneId serverTimeZone) {
return wrapIntoNullableConverter(createNotNullConverter(type, serverTimeZone));
}
// --------------------------------------------------------------------------------
// IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
// necessary because the maven shade plugin cannot relocate classes in
// SerializedLambdas (MSHADE-260).
// --------------------------------------------------------------------------------
/** Creates a runtime converter which assuming input object is not null. */
public static DeserializationRuntimeConverter createNotNullConverter(
DataType type, ZoneId serverTimeZone) {
// if no matched user defined converter, fallback to the default converter
switch (type.getTypeRoot()) {
case BOOLEAN:
return convertToBoolean();
case TINYINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Byte.parseByte(dbzObj.toString());
}
};
case SMALLINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Short.parseShort(dbzObj.toString());
}
};
case INTEGER:
return convertToInt();
case BIGINT:
return convertToLong();
case DATE:
return convertToDate();
case TIME_WITHOUT_TIME_ZONE:
return convertToTime();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return convertToTimestamp(serverTimeZone);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return convertToLocalTimeZoneTimestamp(serverTimeZone);
case FLOAT:
return convertToFloat();
case DOUBLE:
return convertToDouble();
case CHAR:
case VARCHAR:
return convertToString();
case BINARY:
case VARBINARY:
return convertToBinary();
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ROW:
return createRowConverter((RowType) type, serverTimeZone);
case ARRAY:
case MAP:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
private static DeserializationRuntimeConverter convertToBoolean() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Boolean) {
return dbzObj;
} else if (dbzObj instanceof Byte) {
return (byte) dbzObj == 1;
} else if (dbzObj instanceof Short) {
return (short) dbzObj == 1;
} else {
return Boolean.parseBoolean(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToInt() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return dbzObj;
} else if (dbzObj instanceof Long) {
return ((Long) dbzObj).intValue();
} else {
return Integer.parseInt(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToLong() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return ((Integer) dbzObj).longValue();
} else if (dbzObj instanceof Long) {
return dbzObj;
} else {
return Long.parseLong(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToDouble() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return ((Float) dbzObj).doubleValue();
} else if (dbzObj instanceof Double) {
return dbzObj;
} else {
return Double.parseDouble(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToFloat() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return dbzObj;
} else if (dbzObj instanceof Double) {
return ((Double) dbzObj).floatValue();
} else {
return Float.parseFloat(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToDate() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
}
};
}
private static DeserializationRuntimeConverter convertToTime() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case MicroTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000);
case NanoTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000_000);
}
} else if (dbzObj instanceof Integer) {
return dbzObj;
}
// get number of milliseconds of the day
return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
}
};
}
private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromMillis(
micro / 1000, (int) (micro % 1000 * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromMillis(
nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime =
TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
}
};
}
private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
// TIMESTAMP_LTZ type is encoded in string type
Instant instant = Instant.parse(str);
return TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(instant, serverTimeZone));
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}
};
}
private static DeserializationRuntimeConverter convertToString() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return BinaryStringData.fromString(dbzObj.toString());
}
};
}
private static DeserializationRuntimeConverter convertToBinary() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof byte[]) {
return dbzObj;
} else if (dbzObj instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else {
throw new UnsupportedOperationException(
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
}
}
};
}
private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
BigDecimal bigDecimal;
if (dbzObj instanceof byte[]) {
// decimal.handling.mode=precise
bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
} else if (dbzObj instanceof String) {
// decimal.handling.mode=string
bigDecimal = new BigDecimal((String) dbzObj);
} else if (dbzObj instanceof Double) {
// decimal.handling.mode=double
bigDecimal = BigDecimal.valueOf((Double) dbzObj);
} else {
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
SpecialValueDecimal decimal =
VariableScaleDecimal.toLogical((Struct) dbzObj);
bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
} else {
// fallback to string
bigDecimal = new BigDecimal(dbzObj.toString());
}
}
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
};
}
private static DeserializationRuntimeConverter createRowConverter(
RowType rowType, ZoneId serverTimeZone) {
final DeserializationRuntimeConverter[] fieldConverters =
rowType.getFields().stream()
.map(DataField::getType)
.map(logicType -> createConverter(logicType, serverTimeZone))
.toArray(DeserializationRuntimeConverter[]::new);
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
final BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
Struct struct = (Struct) dbzObj;
int arity = fieldNames.length;
Object[] fields = new Object[arity];
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
Field field = schema.field(fieldName);
if (field == null) {
fields[i] = null;
} else {
Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
Object convertedField =
convertField(fieldConverters[i], fieldValue, fieldSchema);
fields[i] = convertedField;
}
}
return generator.generate(fields);
}
};
}
private static Object convertField(
DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
throws Exception {
if (fieldValue == null) {
return null;
} else {
return fieldConverter.convert(fieldValue, fieldSchema);
}
}
private static DeserializationRuntimeConverter wrapIntoNullableConverter(
DeserializationRuntimeConverter converter) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
if (dbzObj == null) {
return null;
}
return converter.convert(dbzObj, schema);
}
};
}
}

@ -0,0 +1,84 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium.event;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.EventDeserializer;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/** Deserializer to deserialize {@link SourceRecord} to {@link Event}. */
@Internal
public abstract class SourceRecordEventDeserializer implements EventDeserializer<SourceRecord> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(SourceRecordEventDeserializer.class);
@Override
public List<? extends Event> deserialize(SourceRecord record) throws Exception {
if (isDataChangeRecord(record)) {
LOG.trace("Process data change record: {}", record);
return deserializeDataChangeRecord(record);
} else if (isSchemaChangeRecord(record)) {
LOG.trace("Process schema change record: {}", record);
return deserializeSchemaChangeRecord(record);
} else {
LOG.trace("Ignored other record: {}", record);
return Collections.emptyList();
}
}
/** Whether the given record is a data change record. */
protected abstract boolean isDataChangeRecord(SourceRecord record);
/** Whether the given record is a schema change record. */
protected abstract boolean isSchemaChangeRecord(SourceRecord record);
/** Deserialize given data change record to {@link DataChangeEvent}. */
protected abstract List<DataChangeEvent> deserializeDataChangeRecord(SourceRecord record)
throws Exception;
/** Deserialize given schema change record to {@link SchemaChangeEvent}. */
protected abstract List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record)
throws Exception;
/** Get {@link TableId} from given record. */
protected abstract TableId getTableId(SourceRecord record);
/** Get metadata from given record. */
protected abstract Map<String, String> getMetadata(SourceRecord record);
public static Schema fieldSchema(Schema schema, String fieldName) {
return schema.field(fieldName).schema();
}
public static Struct fieldStruct(Struct value, String fieldName) {
return value.getStruct(fieldName);
}
}
Loading…
Cancel
Save