[mysql] Enable pass custom jdbc properties for debezium mysql connection (#1217)

This closes #674
pull/1366/head^2
Hang Ruan 3 years ago committed by GitHub
parent c96926b6ec
commit a5dd0c0a14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -66,7 +66,8 @@ public class MySqlValidator implements Validator {
connection = DebeziumUtils.openJdbcConnection(sourceConfig); connection = DebeziumUtils.openJdbcConnection(sourceConfig);
} else { } else {
// for the legacy source // for the legacy source
connection = DebeziumUtils.createMySqlConnection(from(dbzProperties)); connection =
DebeziumUtils.createMySqlConnection(from(dbzProperties), new Properties());
} }
checkVersion(connection); checkVersion(connection);
checkBinlogFormat(connection); checkBinlogFormat(connection);

@ -42,6 +42,7 @@ import java.sql.SQLException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import static com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables; import static com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables;
@ -66,9 +67,16 @@ public class DebeziumUtils {
} }
/** Creates a new {@link MySqlConnection}, but not open the connection. */ /** Creates a new {@link MySqlConnection}, but not open the connection. */
public static MySqlConnection createMySqlConnection(Configuration dbzConfiguration) { public static MySqlConnection createMySqlConnection(MySqlSourceConfig sourceConfig) {
return createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties());
}
/** Creates a new {@link MySqlConnection}, but not open the connection. */
public static MySqlConnection createMySqlConnection(
Configuration dbzConfiguration, Properties jdbcProperties) {
return new MySqlConnection( return new MySqlConnection(
new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration)); new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration, jdbcProperties));
} }
/** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */ /** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */

@ -202,8 +202,7 @@ public class MySqlSourceReader<T>
private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) { private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
final String splitId = split.splitId(); final String splitId = split.splitId();
if (split.getTableSchemas().isEmpty()) { if (split.getTableSchemas().isEmpty()) {
try (MySqlConnection jdbc = try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
Map<TableId, TableChanges.TableChange> tableSchemas = Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
LOG.info("The table schema discovery for binlog split {} success", splitId); LOG.info("The table schema discovery for binlog split {} success", splitId);

@ -135,8 +135,7 @@ public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
currentReader = null; currentReader = null;
} }
if (currentReader == null) { if (currentReader == null) {
final MySqlConnection jdbcConnection = final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig);
createMySqlConnection(sourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient = final BinaryLogClient binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration()); createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext = final StatefulTaskContext statefulTaskContext =
@ -149,8 +148,7 @@ public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
LOG.info("It's turn to read binlog split, close current snapshot reader"); LOG.info("It's turn to read binlog split, close current snapshot reader");
currentReader.close(); currentReader.close();
} }
final MySqlConnection jdbcConnection = final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig);
createMySqlConnection(sourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient = final BinaryLogClient binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration()); createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext = final StatefulTaskContext statefulTaskContext =

@ -0,0 +1,746 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode;
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.legacy.MySqlJdbcContext.DatabaseLocales;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Properties;
/**
* Copied from Debezium project(1.6.4.final) to add custom jdbc properties in the jdbc url. The new
* parameter {@code jdbcProperties} in the constructor of {@link MySqlConnectionConfiguration} will
* be used to generate the jdbc url pattern, and may overwrite the default value.
*
* <p>Line 71: Add field {@code urlPattern} in {@link MySqlConnection} and remove old pattern.
*
* <p>Line 84: Init {@code urlPattern} using the url pattern from {@link
* MySqlConnectionConfiguration}.
*
* <p>Line 552: Generate the connection string by the new field {@code urlPattern}.
*
* <p>Line 566 ~ 577: Add new constant and field {@code urlPattern} to {@link
* MySqlConnectionConfiguration}.
*
* <p>Line 622 ~ 625: Init new field {@code urlPattern} in {@link MySqlConnectionConfiguration}.
*
* <p>Line 686 ~ 716: Add some methods helping to generate the url pattern and add default values.
*/
public class MySqlConnection extends JdbcConnection {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlConnection.class);
private static final String SQL_SHOW_SYSTEM_VARIABLES = "SHOW VARIABLES";
private static final String SQL_SHOW_SYSTEM_VARIABLES_CHARACTER_SET =
"SHOW VARIABLES WHERE Variable_name IN ('character_set_server','collation_server')";
private static final String SQL_SHOW_SESSION_VARIABLE_SSL_VERSION =
"SHOW SESSION STATUS LIKE 'Ssl_version'";
private final Map<String, String> originalSystemProperties = new HashMap<>();
private final MySqlConnectionConfiguration connectionConfig;
private final MysqlFieldReader mysqlFieldReader;
private final String urlPattern;
/**
* Creates a new connection using the supplied configuration.
*
* @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null.
* @param fieldReader binary or text protocol based readers
*/
public MySqlConnection(
MySqlConnectionConfiguration connectionConfig, MysqlFieldReader fieldReader) {
super(connectionConfig.config(), connectionConfig.factory());
this.connectionConfig = connectionConfig;
this.mysqlFieldReader = fieldReader;
this.urlPattern = connectionConfig.getUrlPattern();
}
/**
* Creates a new connection using the supplied configuration.
*
* @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null.
*/
public MySqlConnection(MySqlConnectionConfiguration connectionConfig) {
this(connectionConfig, new MysqlTextProtocolFieldReader());
}
@Override
public synchronized Connection connection(boolean executeOnConnect) throws SQLException {
if (!isConnected() && connectionConfig.sslModeEnabled()) {
originalSystemProperties.clear();
// Set the System properties for SSL for the MySQL driver ...
setSystemProperty("javax.net.ssl.keyStore", MySqlConnectorConfig.SSL_KEYSTORE, true);
setSystemProperty(
"javax.net.ssl.keyStorePassword",
MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD,
false);
setSystemProperty(
"javax.net.ssl.trustStore", MySqlConnectorConfig.SSL_TRUSTSTORE, true);
setSystemProperty(
"javax.net.ssl.trustStorePassword",
MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD,
false);
}
return super.connection(executeOnConnect);
}
@Override
public void close() throws SQLException {
try {
super.close();
} finally {
// Reset the system properties to their original value ...
originalSystemProperties.forEach(
(name, value) -> {
if (value != null) {
System.setProperty(name, value);
} else {
System.clearProperty(name);
}
});
}
}
/**
* Read the MySQL charset-related system variables.
*
* @return the system variables that are related to server character sets; never null
*/
protected Map<String, String> readMySqlCharsetSystemVariables() {
// Read the system variables from the MySQL instance and get the current database name ...
LOGGER.debug("Reading MySQL charset-related system variables before parsing DDL history.");
return querySystemVariables(SQL_SHOW_SYSTEM_VARIABLES_CHARACTER_SET);
}
/**
* Read the MySQL system variables.
*
* @return the system variables that are related to server character sets; never null
*/
protected Map<String, String> readMySqlSystemVariables() {
// Read the system variables from the MySQL instance and get the current database name ...
LOGGER.debug("Reading MySQL system variables");
return querySystemVariables(SQL_SHOW_SYSTEM_VARIABLES);
}
private Map<String, String> querySystemVariables(String statement) {
final Map<String, String> variables = new HashMap<>();
try {
query(
statement,
rs -> {
while (rs.next()) {
String varName = rs.getString(1);
String value = rs.getString(2);
if (varName != null && value != null) {
variables.put(varName, value);
LOGGER.debug(
"\t{} = {}",
Strings.pad(varName, 45, ' '),
Strings.pad(value, 45, ' '));
}
}
});
} catch (SQLException e) {
throw new DebeziumException("Error reading MySQL variables: " + e.getMessage(), e);
}
return variables;
}
protected String setStatementFor(Map<String, String> variables) {
StringBuilder sb = new StringBuilder("SET ");
boolean first = true;
List<String> varNames = new ArrayList<>(variables.keySet());
Collections.sort(varNames);
for (String varName : varNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(varName).append("=");
String value = variables.get(varName);
if (value == null) {
value = "";
}
if (value.contains(",") || value.contains(";")) {
value = "'" + value + "'";
}
sb.append(value);
}
return sb.append(";").toString();
}
protected void setSystemProperty(String property, Field field, boolean showValueInError) {
String value = connectionConfig.config().getString(field);
if (value != null) {
value = value.trim();
String existingValue = System.getProperty(property);
if (existingValue == null) {
// There was no existing property ...
String existing = System.setProperty(property, value);
originalSystemProperties.put(property, existing); // the existing value may be null
} else {
existingValue = existingValue.trim();
if (!existingValue.equalsIgnoreCase(value)) {
// There was an existing property, and the value is different ...
String msg =
"System or JVM property '"
+ property
+ "' is already defined, but the configuration property '"
+ field.name()
+ "' defines a different value";
if (showValueInError) {
msg =
"System or JVM property '"
+ property
+ "' is already defined as "
+ existingValue
+ ", but the configuration property '"
+ field.name()
+ "' defines a different value '"
+ value
+ "'";
}
throw new DebeziumException(msg);
}
// Otherwise, there was an existing property, and the value is exactly the same (so
// do nothing!)
}
}
}
/**
* Read the Ssl Version session variable.
*
* @return the session variables that are related to sessions ssl version
*/
protected String getSessionVariableForSslVersion() {
final String sslVersion = "Ssl_version";
LOGGER.debug("Reading MySQL Session variable for Ssl Version");
Map<String, String> sessionVariables =
querySystemVariables(SQL_SHOW_SESSION_VARIABLE_SSL_VERSION);
if (!sessionVariables.isEmpty() && sessionVariables.containsKey(sslVersion)) {
return sessionVariables.get(sslVersion);
}
return null;
}
/**
* Determine whether the MySQL server has GTIDs enabled.
*
* @return {@code false} if the server's {@code gtid_mode} is set and is {@code OFF}, or {@code
* true} otherwise
*/
public boolean isGtidModeEnabled() {
try {
return queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'",
rs -> {
if (rs.next()) {
return !"OFF".equalsIgnoreCase(rs.getString(2));
}
return false;
});
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
}
}
/**
* Determine the executed GTID set for MySQL.
*
* @return the string representation of MySQL's GTID sets; never null but an empty string if the
* server does not use GTIDs
*/
public String knownGtidSet() {
try {
return queryAndMap(
"SHOW MASTER STATUS",
rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 4) {
return rs.getString(
5); // GTID set, may be null, blank, or contain a GTID set
}
return "";
});
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
}
}
/**
* Determine the difference between two sets.
*
* @return a subtraction of two GTID sets; never null
*/
public GtidSet subtractGtidSet(GtidSet set1, GtidSet set2) {
try {
return prepareQueryAndMap(
"SELECT GTID_SUBTRACT(?, ?)",
ps -> {
ps.setString(1, set1.toString());
ps.setString(2, set2.toString());
},
rs -> {
if (rs.next()) {
return new GtidSet(rs.getString(1));
}
return new GtidSet("");
});
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
}
}
/**
* Get the purged GTID values from MySQL (gtid_purged value).
*
* @return A GTID set; may be empty if not using GTIDs or none have been purged yet
*/
public GtidSet purgedGtidSet() {
try {
return queryAndMap(
"SELECT @@global.gtid_purged",
rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 0) {
return new GtidSet(
rs.getString(
1)); // GTID set, may be null, blank, or contain a GTID
// set
}
return new GtidSet("");
});
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking at gtid_purged variable: ",
e);
}
}
/**
* Determine if the current user has the named privilege. Note that if the user has the "ALL"
* privilege this method returns {@code true}.
*
* @param grantName the name of the MySQL privilege; may not be null
* @return {@code true} if the user has the named privilege, or {@code false} otherwise
*/
public boolean userHasPrivileges(String grantName) {
try {
return queryAndMap(
"SHOW GRANTS FOR CURRENT_USER",
rs -> {
while (rs.next()) {
String grants = rs.getString(1);
LOGGER.debug(grants);
if (grants == null) {
return false;
}
grants = grants.toUpperCase();
if (grants.contains("ALL")
|| grants.contains(grantName.toUpperCase())) {
return true;
}
}
return false;
});
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking at privileges for current user: ",
e);
}
}
/**
* Determine the earliest binlog filename that is still available in the server.
*
* @return the name of the earliest binlog filename, or null if there are none.
*/
public String earliestBinlogFilename() {
// Accumulate the available binlog filenames ...
List<String> logNames = new ArrayList<>();
try {
LOGGER.info("Checking all known binlogs from MySQL");
query(
"SHOW BINARY LOGS",
rs -> {
while (rs.next()) {
logNames.add(rs.getString(1));
}
});
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking for binary logs: ", e);
}
if (logNames.isEmpty()) {
return null;
}
return logNames.get(0);
}
/**
* Determine whether the MySQL server has the binlog_row_image set to 'FULL'.
*
* @return {@code true} if the server's {@code binlog_row_image} is set to {@code FULL}, or
* {@code false} otherwise
*/
protected boolean isBinlogRowImageFull() {
try {
final String rowImage =
queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'",
rs -> {
if (rs.next()) {
return rs.getString(2);
}
// This setting was introduced in MySQL 5.6+ with default of 'FULL'.
// For older versions, assume 'FULL'.
return "FULL";
});
LOGGER.debug("binlog_row_image={}", rowImage);
return "FULL".equalsIgnoreCase(rowImage);
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking at BINLOG_ROW_IMAGE mode: ",
e);
}
}
/**
* Determine whether the MySQL server has the row-level binlog enabled.
*
* @return {@code true} if the server's {@code binlog_format} is set to {@code ROW}, or {@code
* false} otherwise
*/
protected boolean isBinlogFormatRow() {
try {
final String mode =
queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'binlog_format'",
rs -> rs.next() ? rs.getString(2) : "");
LOGGER.debug("binlog_format={}", mode);
return "ROW".equalsIgnoreCase(mode);
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode: ",
e);
}
}
/**
* Query the database server to get the list of the binlog files availble.
*
* @return list of the binlog files
*/
public List<String> availableBinlogFiles() {
List<String> logNames = new ArrayList<>();
try {
LOGGER.info("Get all known binlogs from MySQL");
query(
"SHOW BINARY LOGS",
rs -> {
while (rs.next()) {
logNames.add(rs.getString(1));
}
});
return logNames;
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking for binary logs: ", e);
}
}
public OptionalLong getEstimatedTableSize(TableId tableId) {
try {
// Choose how we create statements based on the # of rows.
// This is approximate and less accurate then COUNT(*),
// but far more efficient for large InnoDB tables.
execute("USE `" + tableId.catalog() + "`;");
return queryAndMap(
"SHOW TABLE STATUS LIKE '" + tableId.table() + "';",
rs -> {
if (rs.next()) {
return OptionalLong.of((rs.getLong(5)));
}
return OptionalLong.empty();
});
} catch (SQLException e) {
LOGGER.debug(
"Error while getting number of rows in table {}: {}",
tableId,
e.getMessage(),
e);
}
return OptionalLong.empty();
}
public boolean isTableIdCaseSensitive() {
return !"0"
.equals(
readMySqlSystemVariables()
.get(MySqlSystemVariables.LOWER_CASE_TABLE_NAMES));
}
/**
* Read the MySQL default character sets for exisiting databases.
*
* @return the map of database names with their default character sets; never null
*/
protected Map<String, DatabaseLocales> readDatabaseCollations() {
LOGGER.debug("Reading default database charsets");
try {
return queryAndMap(
"SELECT schema_name, default_character_set_name, default_collation_name FROM information_schema.schemata",
rs -> {
final Map<String, DatabaseLocales> charsets = new HashMap<>();
while (rs.next()) {
String dbName = rs.getString(1);
String charset = rs.getString(2);
String collation = rs.getString(3);
if (dbName != null && (charset != null || collation != null)) {
charsets.put(dbName, new DatabaseLocales(charset, collation));
LOGGER.debug(
"\t{} = {}, {}",
Strings.pad(dbName, 45, ' '),
Strings.pad(charset, 45, ' '),
Strings.pad(collation, 45, ' '));
}
}
return charsets;
});
} catch (SQLException e) {
throw new DebeziumException(
"Error reading default database charsets: " + e.getMessage(), e);
}
}
public String connectionString() {
return connectionString(urlPattern);
}
/** Connection configuration to create a {@link MySqlConnection}. */
public static class MySqlConnectionConfiguration {
protected static final String JDBC_PROPERTY_LEGACY_DATETIME = "useLegacyDatetimeCode";
protected static final String JDBC_PROPERTY_CONNECTION_TIME_ZONE = "connectionTimeZone";
protected static final String JDBC_PROPERTY_LEGACY_SERVER_TIME_ZONE = "serverTimezone";
private final Configuration jdbcConfig;
private final ConnectionFactory factory;
private final Configuration config;
private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties();
private static final String JDBC_URL_PATTERN =
"jdbc:mysql://${hostname}:${port}/?useSSL=${useSSL}&connectTimeout=${connectTimeout}";
private static final String JDBC_URL_PATTERN_WITH_CUSTOM_USE_SSL =
"jdbc:mysql://${hostname}:${port}/?connectTimeout=${connectTimeout}";
private final String urlPattern;
public MySqlConnectionConfiguration(Configuration config) {
this(config, new Properties());
}
public MySqlConnectionConfiguration(Configuration config, Properties jdbcProperties) {
// Set up the JDBC connection without actually connecting, with extra MySQL-specific
// properties
// to give us better JDBC database metadata behavior, including using UTF-8 for the
// client-side character encoding
// per https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-charsets.html
this.config = config;
final boolean useSSL = sslModeEnabled();
final Configuration dbConfig =
config.filter(
x ->
!(x.startsWith(
DatabaseHistory
.CONFIGURATION_FIELD_PREFIX_STRING)
|| x.equals(
MySqlConnectorConfig.DATABASE_HISTORY
.name())))
.edit()
.withDefault(
MySqlConnectorConfig.PORT,
MySqlConnectorConfig.PORT.defaultValue())
.build()
.subset("database.", true);
final Builder jdbcConfigBuilder =
dbConfig.edit()
.with(
"connectTimeout",
Long.toString(getConnectionTimeout().toMillis()))
.with("useSSL", Boolean.toString(useSSL));
final String legacyDateTime = dbConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME);
if (legacyDateTime == null) {
jdbcConfigBuilder.with(JDBC_PROPERTY_LEGACY_DATETIME, "false");
} else if ("true".equals(legacyDateTime)) {
LOGGER.warn(
"'{}' is set to 'true'. This setting is not recommended and can result in timezone issues.",
JDBC_PROPERTY_LEGACY_DATETIME);
}
jdbcConfigBuilder.with(
JDBC_PROPERTY_CONNECTION_TIME_ZONE, determineConnectionTimeZone(dbConfig));
this.jdbcConfig = jdbcConfigBuilder.build();
String driverClassName = this.jdbcConfig.getString(MySqlConnectorConfig.JDBC_DRIVER);
this.urlPattern = formatJdbcUrl(jdbcProperties);
factory =
JdbcConnection.patternBasedFactory(
urlPattern, driverClassName, getClass().getClassLoader());
}
private static String determineConnectionTimeZone(final Configuration dbConfig) {
// Debezium by default expects timezoned data delivered in server timezone
String connectionTimeZone = dbConfig.getString(JDBC_PROPERTY_CONNECTION_TIME_ZONE);
if (connectionTimeZone != null) {
return connectionTimeZone;
}
// fall back to legacy property
final String serverTimeZone = dbConfig.getString(JDBC_PROPERTY_LEGACY_SERVER_TIME_ZONE);
if (serverTimeZone != null) {
LOGGER.warn(
"Database configuration option '{}' is set but is obsolete, please use '{}' instead",
JDBC_PROPERTY_LEGACY_SERVER_TIME_ZONE,
JDBC_PROPERTY_CONNECTION_TIME_ZONE);
connectionTimeZone = serverTimeZone;
}
return connectionTimeZone != null ? connectionTimeZone : "SERVER";
}
public Configuration config() {
return jdbcConfig;
}
public ConnectionFactory factory() {
return factory;
}
public String username() {
return config.getString(MySqlConnectorConfig.USER);
}
public String password() {
return config.getString(MySqlConnectorConfig.PASSWORD);
}
public String hostname() {
return config.getString(MySqlConnectorConfig.HOSTNAME);
}
public int port() {
return config.getInteger(MySqlConnectorConfig.PORT);
}
public SecureConnectionMode sslMode() {
String mode = config.getString(MySqlConnectorConfig.SSL_MODE);
return SecureConnectionMode.parse(mode);
}
public boolean sslModeEnabled() {
return sslMode() != SecureConnectionMode.DISABLED;
}
public Duration getConnectionTimeout() {
return Duration.ofMillis(config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS));
}
public String getUrlPattern() {
return urlPattern;
}
private String formatJdbcUrl(Properties jdbcProperties) {
Properties combinedProperties = new Properties();
combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES);
combinedProperties.putAll(jdbcProperties);
StringBuilder jdbcUrlStringBuilder =
jdbcProperties.getProperty("useSSL") == null
? new StringBuilder(JDBC_URL_PATTERN)
: new StringBuilder(JDBC_URL_PATTERN_WITH_CUSTOM_USE_SSL);
combinedProperties.forEach(
(key, value) -> {
jdbcUrlStringBuilder.append("&").append(key).append("=").append(value);
});
return jdbcUrlStringBuilder.toString();
}
private static Properties initializeDefaultJdbcProperties() {
Properties defaultJdbcProperties = new Properties();
defaultJdbcProperties.setProperty("useInformationSchema", "true");
defaultJdbcProperties.setProperty("nullCatalogMeansCurrent", "false");
defaultJdbcProperties.setProperty("useUnicode", "true");
defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL");
defaultJdbcProperties.setProperty("characterEncoding", "UTF-8");
defaultJdbcProperties.setProperty("characterSetResults", "UTF-8");
return defaultJdbcProperties;
}
public EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode() {
String mode =
config.getString(CommonConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE);
if (mode == null) {
mode =
config.getString(
MySqlConnectorConfig.EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE);
}
return EventProcessingFailureHandlingMode.parse(mode);
}
public EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode() {
String mode = config.getString(MySqlConnectorConfig.INCONSISTENT_SCHEMA_HANDLING_MODE);
return EventProcessingFailureHandlingMode.parse(mode);
}
}
@Override
public <T extends DatabaseSchema<TableId>> Object getColumnValue(
ResultSet rs, int columnIndex, Column column, Table table, T schema)
throws SQLException {
return mysqlFieldReader.readField(rs, columnIndex, column, table);
}
@Override
public String quotedTableIdString(TableId tableId) {
return tableId.toQuotedString('`');
}
}

@ -0,0 +1,84 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.debezium;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import io.debezium.connector.mysql.MySqlConnection;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
/** Tests for {@link DebeziumUtils}. */
public class DebeziumUtilsTest {
@Test
void testCreateMySqlConnection() {
// test without set useSSL
Properties jdbcProps = new Properties();
jdbcProps.setProperty("onlyTest", "test");
MySqlSourceConfig configWithoutUseSSL = getConfig(jdbcProps);
MySqlConnection connection0 = DebeziumUtils.createMySqlConnection(configWithoutUseSSL);
assertEquals(
"jdbc:mysql://localhost:3306/?useSSL=false&connectTimeout=20000&useInformationSchema=true"
+ "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&onlyTest=test"
+ "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true",
connection0.connectionString());
// test with set useSSL=false
jdbcProps.setProperty("useSSL", "false");
MySqlSourceConfig configNotUseSSL = getConfig(jdbcProps);
MySqlConnection connection1 = DebeziumUtils.createMySqlConnection(configNotUseSSL);
assertEquals(
"jdbc:mysql://localhost:3306/?connectTimeout=20000&useInformationSchema=true"
+ "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&useSSL=false&onlyTest=test"
+ "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true",
connection1.connectionString());
// test with set useSSL=true
jdbcProps.setProperty("useSSL", "true");
MySqlSourceConfig configUseSSL = getConfig(jdbcProps);
MySqlConnection connection2 = DebeziumUtils.createMySqlConnection(configUseSSL);
assertEquals(
"jdbc:mysql://localhost:3306/?connectTimeout=20000&useInformationSchema=true"
+ "&nullCatalogMeansCurrent=false&characterSetResults=UTF-8&useSSL=true&onlyTest=test"
+ "&zeroDateTimeBehavior=CONVERT_TO_NULL&characterEncoding=UTF-8&useUnicode=true",
connection2.connectionString());
}
private MySqlSourceConfig getConfig(Properties jdbcProperties) {
return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.databaseList("fakeDb")
.tableList("fakeDb.fakeTable")
.includeSchemaChanges(false)
.hostname("localhost")
.port(3306)
.splitSize(10)
.fetchSize(2)
.connectTimeout(Duration.ofSeconds(20))
.username("fakeUser")
.password("fakePw")
.serverTimeZone(ZoneId.of("UTC").toString())
.jdbcProperties(jdbcProperties)
.createConfig(0);
}
}

@ -84,7 +84,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
customerDatabase.createAndInitialize(); customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}); MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
final DataType dataType = final DataType dataType =
DataTypes.ROW( DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("id", DataTypes.BIGINT()),
@ -123,7 +123,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
customerDatabase.createAndInitialize(); customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}); MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
final DataType dataType = final DataType dataType =
DataTypes.ROW( DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("id", DataTypes.BIGINT()),
@ -171,7 +171,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
customerDatabase.createAndInitialize(); customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"}); MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
final DataType dataType = final DataType dataType =
DataTypes.ROW( DataTypes.ROW(
@ -207,7 +207,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
MySqlSourceConfig sourceConfig = MySqlSourceConfig sourceConfig =
getConfig(new String[] {"customer_card", "customer_card_single_line"}); getConfig(new String[] {"customer_card", "customer_card_single_line"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
final DataType dataType = final DataType dataType =
DataTypes.ROW( DataTypes.ROW(
DataTypes.FIELD("card_no", DataTypes.BIGINT()), DataTypes.FIELD("card_no", DataTypes.BIGINT()),
@ -264,7 +264,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
MySqlSourceConfig sourceConfig = MySqlSourceConfig sourceConfig =
getConfig(StartupOptions.latest(), new String[] {"customers"}); getConfig(StartupOptions.latest(), new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
final DataType dataType = final DataType dataType =
DataTypes.ROW( DataTypes.ROW(
@ -316,7 +316,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
.debeziumProperties(dbzProps) .debeziumProperties(dbzProps)
.createConfig(0); .createConfig(0);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
// Create binlog reader and submit split // Create binlog reader and submit split
BinlogSplitReader binlogReader = createBinlogReader(sourceConfig); BinlogSplitReader binlogReader = createBinlogReader(sourceConfig);
@ -351,8 +351,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
throws Exception { throws Exception {
MySqlBinlogSplitAssigner binlogSplitAssigner = new MySqlBinlogSplitAssigner(sourceConfig); MySqlBinlogSplitAssigner binlogSplitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
binlogSplitAssigner.open(); binlogSplitAssigner.open();
try (MySqlConnection jdbc = try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
Map<TableId, TableChanges.TableChange> tableSchemas = Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
return MySqlBinlogSplit.fillTableSchemas( return MySqlBinlogSplit.fillTableSchemas(

@ -71,7 +71,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
customerDatabase.createAndInitialize(); customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}, 10); MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}, 10);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
} }
@Test @Test

@ -52,6 +52,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -753,7 +754,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
io.debezium.config.Configuration configuration = io.debezium.config.Configuration configuration =
io.debezium.config.Configuration.from(properties); io.debezium.config.Configuration.from(properties);
return DebeziumUtils.createMySqlConnection(configuration); return DebeziumUtils.createMySqlConnection(configuration, new Properties());
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------

@ -92,8 +92,7 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING())); DataTypes.FIELD("phone_number", DataTypes.STRING()));
MySqlSplit binlogSplit; MySqlSplit binlogSplit;
try (MySqlConnection jdbc = try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
Map<TableId, TableChanges.TableChange> tableSchemas = Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
binlogSplit = binlogSplit =

@ -133,6 +133,18 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
@Test @Test
public void testConsumingAllEvents() throws Exception { public void testConsumingAllEvents() throws Exception {
runConsumingAllEventsTest("");
}
@Test
public void testConsumingAllEventsUseSSL() throws Exception {
runConsumingAllEventsTest(
", 'jdbc.properties.useSSL'= 'true',"
+ " 'jdbc.properties.requireSSL'= 'true',"
+ " 'jdbc.properties.verifyServerCerticate'= 'false'");
}
private void runConsumingAllEventsTest(String otherTableOptions) throws Exception {
inventoryDatabase.createAndInitialize(); inventoryDatabase.createAndInitialize();
String sourceDDL = String sourceDDL =
String.format( String.format(
@ -154,6 +166,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
+ " 'scan.incremental.snapshot.enabled' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s'," + " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'" + " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ " %s"
+ ")", + ")",
MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(), MYSQL_CONTAINER.getDatabasePort(),
@ -164,7 +177,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
getDezImplementation(), getDezImplementation(),
incrementalSnapshot, incrementalSnapshot,
getServerId(), getServerId(),
getSplitSize()); getSplitSize(),
otherTableOptions);
String sinkDDL = String sinkDDL =
"CREATE TABLE sink (" "CREATE TABLE sink ("
+ " name STRING," + " name STRING,"

Loading…
Cancel
Save