Support "debezium.*" option to pass through the Debezium's properties

This closes #15
pull/19/head
qinshiwei 5 years ago committed by Jark Wu
parent 83d75b551d
commit c89b46f14a
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -0,0 +1,29 @@
package com.alibaba.ververica.cdc.debezium.table;
import java.util.Map;
import java.util.Properties;
/** Option utils for Debezium options. */
public class DebeziumOptions {
public static final String DEBEZIUM_OPTIONS_PREFIX = "debezium.";
public static Properties getDebeziumProperties(Map<String, String> properties) {
final Properties debeziumProperties = new Properties();
if (hasDebeziumProperties(properties)) {
properties.keySet().stream()
.filter(key -> key.startsWith(DEBEZIUM_OPTIONS_PREFIX))
.forEach(key -> {
final String value = properties.get(key);
final String subKey = key.substring((DEBEZIUM_OPTIONS_PREFIX).length());
debeziumProperties.put(subKey, value);
});
}
return debeziumProperties;
}
/** Decides if the table options contains Debezium client properties that start with prefix 'debezium'. */
private static boolean hasDebeziumProperties(Map<String, String> debeziumOptions) {
return debeziumOptions.keySet().stream().anyMatch(k -> k.startsWith(DEBEZIUM_OPTIONS_PREFIX));
}
}

@ -47,6 +47,7 @@ public class MySQLSource {
private String password;
private Integer serverId;
private String[] tableList;
private Properties dbzProperties;
private DebeziumDeserializationSchema<T> deserializer;
public Builder<T> hostname(String hostname) {
@ -110,6 +111,14 @@ public class MySQLSource {
return this;
}
/**
* The Debezium MySQL connector properties. For example, "snapshot.mode".
*/
public Builder<T> debeziumProperties(Properties properties) {
this.dbzProperties = properties;
return this;
}
/**
* The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}.
*/
@ -141,6 +150,11 @@ public class MySQLSource {
if (tableList != null) {
props.setProperty("table.whitelist", String.join(",", tableList));
}
if (dbzProperties != null) {
dbzProperties.forEach(props::put);
}
return new DebeziumSourceFunction<>(
deserializer,
props);

@ -37,6 +37,7 @@ import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -54,6 +55,7 @@ public class MySQLTableSource implements ScanTableSource {
private final String password;
private final Integer serverId;
private final String tableName;
private final Properties dbzProperties;
public MySQLTableSource(
TableSchema physicalSchema,
@ -63,6 +65,7 @@ public class MySQLTableSource implements ScanTableSource {
String tableName,
String username,
String password,
Properties dbzProperties,
@Nullable Integer serverId) {
this.physicalSchema = physicalSchema;
this.port = port;
@ -72,6 +75,7 @@ public class MySQLTableSource implements ScanTableSource {
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.serverId = serverId;
this.dbzProperties = dbzProperties;
}
@Override
@ -99,6 +103,7 @@ public class MySQLTableSource implements ScanTableSource {
.tableList(database + "." + tableName)
.username(username)
.password(password)
.debeziumProperties(dbzProperties)
.deserializer(deserializer);
Optional.ofNullable(serverId).ifPresent(builder::serverId);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
@ -116,6 +121,7 @@ public class MySQLTableSource implements ScanTableSource {
tableName,
username,
password,
dbzProperties,
serverId
);
}
@ -136,12 +142,13 @@ public class MySQLTableSource implements ScanTableSource {
Objects.equals(username, that.username) &&
Objects.equals(password, that.password) &&
Objects.equals(serverId, that.serverId) &&
Objects.equals(tableName, that.tableName);
Objects.equals(tableName, that.tableName) &&
Objects.equals(dbzProperties, that.dbzProperties);
}
@Override
public int hashCode() {
return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName);
return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName, dbzProperties);
}
@Override

@ -18,6 +18,8 @@
package com.alibaba.ververica.cdc.connectors.mysql.table;
import static com.alibaba.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
@ -27,6 +29,8 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import com.alibaba.ververica.cdc.debezium.table.DebeziumOptions;
import java.util.HashSet;
import java.util.Set;
@ -78,7 +82,7 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
@ -98,6 +102,7 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
tableName,
username,
password,
getDebeziumProperties(context.getCatalogTable().getOptions()),
serverId
);
}

@ -34,6 +34,7 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -57,6 +58,7 @@ public class MySQLTableSourceFactoryTest {
private static final String MY_PASSWORD = "flinkpw";
private static final String MY_DATABASE = "myDB";
private static final String MY_TABLE = "myTable";
private static final Properties PROPERTIES = new Properties();
@Test
public void testCommonProperties() {
@ -72,6 +74,7 @@ public class MySQLTableSourceFactoryTest {
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
PROPERTIES,
null
);
assertEquals(expectedSource, actualSource);
@ -79,11 +82,14 @@ public class MySQLTableSourceFactoryTest {
@Test
public void testOptionalProperties() {
Map<String, String> properties = getAllOptions();
properties.put("port", "3307");
properties.put("server-id", "4321");
DynamicTableSource actualSource = createTableSource(properties);
Map<String, String> options = getAllOptions();
options.put("port", "3307");
options.put("server-id", "4321");
options.put("debezium.snapshot.mode", "never");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
dbzProperties.put("snapshot.mode", "never");
MySQLTableSource expectedSource = new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
3307,
@ -92,6 +98,7 @@ public class MySQLTableSourceFactoryTest {
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
dbzProperties,
4321
);
assertEquals(expectedSource, actualSource);

@ -49,6 +49,7 @@ public class PostgreSQLSource {
private String password;
private String[] schemaList;
private String[] tableList;
private Properties dbzProperties;
private DebeziumDeserializationSchema<T> deserializer;
/**
@ -119,6 +120,14 @@ public class PostgreSQLSource {
return this;
}
/**
* The Debezium Postgres connector properties.
*/
public Builder<T> debeziumProperties(Properties properties) {
this.dbzProperties = properties;
return this;
}
/**
* The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}.
*/
@ -149,6 +158,11 @@ public class PostgreSQLSource {
if (tableList != null) {
props.setProperty("table.whitelist", String.join(",", tableList));
}
if (dbzProperties != null) {
dbzProperties.forEach(props::put);
}
return new DebeziumSourceFunction<>(
deserializer,
props);

@ -18,6 +18,9 @@
package com.alibaba.ververica.cdc.connectors.postgres.table;
import static com.alibaba.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static com.alibaba.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
@ -83,7 +86,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
@Override
public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
@ -105,7 +108,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
tableName,
username,
password,
pluginName);
pluginName,
getDebeziumProperties(context.getCatalogTable().getOptions()));
}
@Override

@ -34,6 +34,7 @@ import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import java.util.Objects;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -52,6 +53,7 @@ public class PostgreSQLTableSource implements ScanTableSource {
private final String username;
private final String password;
private final String pluginName;
private final Properties dbzProperties;
public PostgreSQLTableSource(
TableSchema physicalSchema,
@ -62,7 +64,8 @@ public class PostgreSQLTableSource implements ScanTableSource {
String tableName,
String username,
String password,
String pluginName) {
String pluginName,
Properties dbzProperties) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@ -72,6 +75,7 @@ public class PostgreSQLTableSource implements ScanTableSource {
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.pluginName = checkNotNull(pluginName);
this.dbzProperties = dbzProperties;
}
@Override
@ -102,6 +106,7 @@ public class PostgreSQLTableSource implements ScanTableSource {
.username(username)
.password(password)
.decodingPluginName(pluginName)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
@ -118,7 +123,8 @@ public class PostgreSQLTableSource implements ScanTableSource {
tableName,
username,
password,
pluginName);
pluginName,
dbzProperties);
}
@Override
@ -137,12 +143,13 @@ public class PostgreSQLTableSource implements ScanTableSource {
Objects.equals(schemaName, that.schemaName) &&
Objects.equals(tableName, that.tableName) &&
Objects.equals(username, that.username) &&
Objects.equals(password, that.password);
Objects.equals(password, that.password) &&
Objects.equals(dbzProperties, that.dbzProperties);
}
@Override
public int hashCode() {
return Objects.hash(physicalSchema, port, hostname, database, schemaName, tableName, username, password);
return Objects.hash(physicalSchema, port, hostname, database, schemaName, tableName, username, password, dbzProperties);
}
@Override

@ -34,6 +34,7 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -58,6 +59,7 @@ public class PostgreSQLTableFactoryTest {
private static final String MY_DATABASE = "myDB";
private static final String MY_TABLE = "myTable";
private static final String MY_SCHEMA = "public";
private static final Properties PROPERTIES = new Properties();
@Test
public void testCommonProperties() {
@ -74,17 +76,21 @@ public class PostgreSQLTableFactoryTest {
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
"decoderbufs");
"decoderbufs",
PROPERTIES);
assertEquals(expectedSource, actualSource);
}
@Test
public void testOptionalProperties() {
Map<String, String> properties = getAllOptions();
properties.put("port", "5444");
properties.put("decoding.plugin.name", "wal2json");
DynamicTableSource actualSource = createTableSource(properties);
Map<String, String> options = getAllOptions();
options.put("port", "5444");
options.put("decoding.plugin.name", "wal2json");
options.put("debezium.snapshot.mode", "never");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
dbzProperties.put("snapshot.mode", "never");
PostgreSQLTableSource expectedSource = new PostgreSQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
5444,
@ -94,7 +100,8 @@ public class PostgreSQLTableFactoryTest {
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
"wal2json");
"wal2json",
dbzProperties);
assertEquals(expectedSource, actualSource);
}

Loading…
Cancel
Save