[mysql] Allow custom JDBC URL parameters in MySQL CDC source (#921)

pull/947/head
Paul Zhang 3 years ago committed by GitHub
parent d122877ddc
commit 580391b6ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -252,6 +252,13 @@ During a snapshot operation, the connector will query each included table to pro
<td>Integer</td>
<td>The connection pool size.</td>
</tr>
<tr>
<td>jdbc.properties.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">20</td>
<td>String</td>
<td>Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.</td>
</tr>
<tr>
<td>heartbeat.interval</td>
<td>optional</td>

@ -202,6 +202,12 @@ public class MySqlSourceBuilder<T> {
return this;
}
/** Custom properties that will overwrite the default JDBC connection URL. */
public MySqlSourceBuilder<T> jdbcProperties(Properties jdbcProperties) {
this.configFactory.jdbcProperties(jdbcProperties);
return this;
}
/** The Debezium MySQL connector properties. For example, "snapshot.mode". */
public MySqlSourceBuilder<T> debeziumProperties(Properties properties) {
this.configFactory.debeziumProperties(properties);

@ -56,6 +56,7 @@ public class MySqlSourceConfig implements Serializable {
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@ -84,7 +85,8 @@ public class MySqlSourceConfig implements Serializable {
double distributionFactorLower,
boolean includeSchemaChanges,
boolean scanNewlyAddedTableEnabled,
Properties dbzProperties) {
Properties dbzProperties,
Properties jdbcProperties) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@ -107,6 +109,7 @@ public class MySqlSourceConfig implements Serializable {
this.dbzProperties = checkNotNull(dbzProperties);
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
this.jdbcProperties = jdbcProperties;
}
public String getHostname() {
@ -201,4 +204,8 @@ public class MySqlSourceConfig implements Serializable {
public RelationalTableFilters getTableFilters() {
return dbzMySqlConfig.getTableFilters();
}
public Properties getJdbcProperties() {
return jdbcProperties;
}
}

@ -70,6 +70,7 @@ public class MySqlSourceConfigFactory implements Serializable {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean scanNewlyAddedTableEnabled = false;
private Properties jdbcProperties;
private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue();
private Properties dbzProperties;
@ -216,6 +217,12 @@ public class MySqlSourceConfigFactory implements Serializable {
return this;
}
/** Custom properties that will overwrite the default JDBC connection URL. */
public MySqlSourceConfigFactory jdbcProperties(Properties jdbcProperties) {
this.jdbcProperties = jdbcProperties;
return this;
}
/** Specifies the startup options. */
public MySqlSourceConfigFactory startupOptions(StartupOptions startupOptions) {
switch (startupOptions.startupMode) {
@ -299,6 +306,10 @@ public class MySqlSourceConfigFactory implements Serializable {
props.putAll(dbzProperties);
}
if (jdbcProperties == null) {
jdbcProperties = new Properties();
}
return new MySqlSourceConfig(
hostname,
port,
@ -319,6 +330,7 @@ public class MySqlSourceConfigFactory implements Serializable {
distributionFactorLower,
includeSchemaChanges,
scanNewlyAddedTableEnabled,
props);
props,
jdbcProperties);
}
}

@ -23,14 +23,17 @@ import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import java.util.Properties;
/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
public class PooledDataSourceFactory {
public static final String JDBC_URL_PATTERN =
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true";
public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
public static final String SERVER_TIMEZONE_KEY = "serverTimezone";
public static final int MINIMUM_POOL_SIZE = 1;
private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties();
private PooledDataSourceFactory() {}
@ -39,9 +42,10 @@ public class PooledDataSourceFactory {
String hostName = sourceConfig.getHostname();
int port = sourceConfig.getPort();
Properties jdbcProperties = sourceConfig.getJdbcProperties();
config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
config.setJdbcUrl(String.format(JDBC_URL_PATTERN, hostName, port));
config.setJdbcUrl(formatJdbcUrl(hostName, port, jdbcProperties));
config.setUsername(sourceConfig.getUsername());
config.setPassword(sourceConfig.getPassword());
config.setMinimumIdle(MINIMUM_POOL_SIZE);
@ -58,4 +62,28 @@ public class PooledDataSourceFactory {
return new HikariDataSource(config);
}
private static String formatJdbcUrl(String hostName, int port, Properties jdbcProperties) {
Properties combinedProperties = new Properties();
combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES);
combinedProperties.putAll(jdbcProperties);
StringBuilder jdbcUrlStringBuilder =
new StringBuilder(String.format(JDBC_URL_PATTERN, hostName, port));
combinedProperties.forEach(
(key, value) -> {
jdbcUrlStringBuilder.append("&").append(key).append("=").append(value);
});
return jdbcUrlStringBuilder.toString();
}
private static Properties initializeDefaultJdbcProperties() {
Properties defaultJdbcProperties = new Properties();
defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL");
defaultJdbcProperties.setProperty("characterEncoding", "UTF-8");
defaultJdbcProperties.setProperty("characterSetResults", "UTF-8");
return defaultJdbcProperties;
}
}

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.table;
import java.util.Map;
import java.util.Properties;
/** Option utils for JDBC URL properties. */
public class JdbcUrlUtils {
// Prefix for JDBC specific properties.
public static final String PROPERTIES_PREFIX = "jdbc.properties.";
public static Properties getJdbcProperties(Map<String, String> tableOptions) {
Properties jdbcProperties = new Properties();
if (hasJdbcProperties(tableOptions)) {
tableOptions.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(
key -> {
final String value = tableOptions.get(key);
final String subKey = key.substring((PROPERTIES_PREFIX).length());
jdbcProperties.put(subKey, value);
});
}
return jdbcProperties;
}
/**
* Decides if the table options contains JDBC properties that start with prefix
* 'jdbc.properties'.
*/
private static boolean hasJdbcProperties(Map<String, String> tableOptions) {
return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
}

@ -79,6 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final double distributionFactorLower;
private final StartupOptions startupOptions;
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;
private final Duration heartbeatInterval;
// --------------------------------------------------------------------------------------------
@ -135,6 +136,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
distributionFactorLower,
startupOptions,
false,
new Properties(),
heartbeatInterval);
}
@ -160,6 +162,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
double distributionFactorLower,
StartupOptions startupOptions,
boolean scanNewlyAddedTableEnabled,
Properties jdbcProperties,
Duration heartbeatInterval) {
this.physicalSchema = physicalSchema;
this.port = port;
@ -182,6 +185,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.distributionFactorLower = distributionFactorLower;
this.startupOptions = startupOptions;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.jdbcProperties = jdbcProperties;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
@ -238,6 +242,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.startupOptions(startupOptions)
.deserializer(deserializer)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.jdbcProperties(jdbcProperties)
.heartbeatInterval(heartbeatInterval)
.build();
return SourceProvider.of(parallelSource);
@ -316,6 +321,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled,
jdbcProperties,
heartbeatInterval);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
@ -353,7 +359,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
&& Objects.equals(connectionPoolSize, that.connectionPoolSize)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys);
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(jdbcProperties, that.jdbcProperties);
}
@Override
@ -381,7 +388,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
startupOptions,
producedDataType,
metadataKeys,
scanNewlyAddedTableEnabled);
scanNewlyAddedTableEnabled,
jdbcProperties);
}
@Override

@ -73,7 +73,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
helper.validateExcept(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
@ -135,6 +136,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled,
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval);
}

@ -121,6 +121,8 @@ public class MySqlTableSourceFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -161,6 +163,8 @@ public class MySqlTableSourceFactoryTest {
40.5d,
0.01d,
StartupOptions.initial(),
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -198,6 +202,8 @@ public class MySqlTableSourceFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -233,6 +239,8 @@ public class MySqlTableSourceFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest(),
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -245,11 +253,14 @@ public class MySqlTableSourceFactoryTest {
options.put("server-time-zone", "Asia/Shanghai");
options.put("scan.newly-added-table.enabled", "true");
options.put("debezium.snapshot.mode", "never");
options.put("jdbc.properties.useSSL", "false");
options.put("heartbeat.interval", "15213ms");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
dbzProperties.put("snapshot.mode", "never");
Properties jdbcProperties = new Properties();
jdbcProperties.setProperty("useSSL", "false");
MySqlTableSource expectedSource =
new MySqlTableSource(
SCHEMA,
@ -273,6 +284,7 @@ public class MySqlTableSourceFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
true,
jdbcProperties,
Duration.ofMillis(15213));
assertEquals(expectedSource, actualSource);
}
@ -330,6 +342,8 @@ public class MySqlTableSourceFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -396,6 +410,8 @@ public class MySqlTableSourceFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest(),
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -434,6 +450,8 @@ public class MySqlTableSourceFactoryTest {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
false,
new Properties(),
HEARTBEAT_INTERVAL.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

Loading…
Cancel
Save