diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/table/DebeziumOptions.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/table/DebeziumOptions.java new file mode 100644 index 000000000..22fda7d9d --- /dev/null +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/table/DebeziumOptions.java @@ -0,0 +1,29 @@ +package com.alibaba.ververica.cdc.debezium.table; + +import java.util.Map; +import java.util.Properties; + +/** Option utils for Debezium options. */ +public class DebeziumOptions { + public static final String DEBEZIUM_OPTIONS_PREFIX = "debezium."; + + public static Properties getDebeziumProperties(Map properties) { + final Properties debeziumProperties = new Properties(); + + if (hasDebeziumProperties(properties)) { + properties.keySet().stream() + .filter(key -> key.startsWith(DEBEZIUM_OPTIONS_PREFIX)) + .forEach(key -> { + final String value = properties.get(key); + final String subKey = key.substring((DEBEZIUM_OPTIONS_PREFIX).length()); + debeziumProperties.put(subKey, value); + }); + } + return debeziumProperties; + } + + /** Decides if the table options contains Debezium client properties that start with prefix 'debezium'. */ + private static boolean hasDebeziumProperties(Map debeziumOptions) { + return debeziumOptions.keySet().stream().anyMatch(k -> k.startsWith(DEBEZIUM_OPTIONS_PREFIX)); + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java index ac17e7e8e..f7af68462 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java @@ -47,6 +47,7 @@ public class MySQLSource { private String password; private Integer serverId; private String[] tableList; + private Properties dbzProperties; private DebeziumDeserializationSchema deserializer; public Builder hostname(String hostname) { @@ -110,6 +111,14 @@ public class MySQLSource { return this; } + /** + * The Debezium MySQL connector properties. For example, "snapshot.mode". + */ + public Builder debeziumProperties(Properties properties) { + this.dbzProperties = properties; + return this; + } + /** * The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}. */ @@ -141,6 +150,11 @@ public class MySQLSource { if (tableList != null) { props.setProperty("table.whitelist", String.join(",", tableList)); } + + if (dbzProperties != null) { + dbzProperties.forEach(props::put); + } + return new DebeziumSourceFunction<>( deserializer, props); diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSource.java index f17331474..946acfdd1 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSource.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import java.util.Objects; import java.util.Optional; +import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -54,6 +55,7 @@ public class MySQLTableSource implements ScanTableSource { private final String password; private final Integer serverId; private final String tableName; + private final Properties dbzProperties; public MySQLTableSource( TableSchema physicalSchema, @@ -63,6 +65,7 @@ public class MySQLTableSource implements ScanTableSource { String tableName, String username, String password, + Properties dbzProperties, @Nullable Integer serverId) { this.physicalSchema = physicalSchema; this.port = port; @@ -72,6 +75,7 @@ public class MySQLTableSource implements ScanTableSource { this.username = checkNotNull(username); this.password = checkNotNull(password); this.serverId = serverId; + this.dbzProperties = dbzProperties; } @Override @@ -99,6 +103,7 @@ public class MySQLTableSource implements ScanTableSource { .tableList(database + "." + tableName) .username(username) .password(password) + .debeziumProperties(dbzProperties) .deserializer(deserializer); Optional.ofNullable(serverId).ifPresent(builder::serverId); DebeziumSourceFunction sourceFunction = builder.build(); @@ -116,6 +121,7 @@ public class MySQLTableSource implements ScanTableSource { tableName, username, password, + dbzProperties, serverId ); } @@ -136,12 +142,13 @@ public class MySQLTableSource implements ScanTableSource { Objects.equals(username, that.username) && Objects.equals(password, that.password) && Objects.equals(serverId, that.serverId) && - Objects.equals(tableName, that.tableName); + Objects.equals(tableName, that.tableName) && + Objects.equals(dbzProperties, that.dbzProperties); } @Override public int hashCode() { - return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName); + return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName, dbzProperties); } @Override diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java index e14e652e2..11ce6d1eb 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java @@ -18,6 +18,8 @@ package com.alibaba.ververica.cdc.connectors.mysql.table; +import static com.alibaba.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; + import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; @@ -27,6 +29,8 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.utils.TableSchemaUtils; +import com.alibaba.ververica.cdc.debezium.table.DebeziumOptions; + import java.util.HashSet; import java.util.Set; @@ -78,7 +82,7 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory { @Override public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - helper.validate(); + helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX); final ReadableConfig config = helper.getOptions(); String hostname = config.get(HOSTNAME); @@ -98,6 +102,7 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory { tableName, username, password, + getDebeziumProperties(context.getCatalogTable().getOptions()), serverId ); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java index 40840cfe3..e99666f88 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -57,6 +58,7 @@ public class MySQLTableSourceFactoryTest { private static final String MY_PASSWORD = "flinkpw"; private static final String MY_DATABASE = "myDB"; private static final String MY_TABLE = "myTable"; + private static final Properties PROPERTIES = new Properties(); @Test public void testCommonProperties() { @@ -72,6 +74,7 @@ public class MySQLTableSourceFactoryTest { MY_TABLE, MY_USERNAME, MY_PASSWORD, + PROPERTIES, null ); assertEquals(expectedSource, actualSource); @@ -79,11 +82,14 @@ public class MySQLTableSourceFactoryTest { @Test public void testOptionalProperties() { - Map properties = getAllOptions(); - properties.put("port", "3307"); - properties.put("server-id", "4321"); - - DynamicTableSource actualSource = createTableSource(properties); + Map options = getAllOptions(); + options.put("port", "3307"); + options.put("server-id", "4321"); + options.put("debezium.snapshot.mode", "never"); + + DynamicTableSource actualSource = createTableSource(options); + Properties dbzProperties = new Properties(); + dbzProperties.put("snapshot.mode", "never"); MySQLTableSource expectedSource = new MySQLTableSource( TableSchemaUtils.getPhysicalSchema(SCHEMA), 3307, @@ -92,6 +98,7 @@ public class MySQLTableSourceFactoryTest { MY_TABLE, MY_USERNAME, MY_PASSWORD, + dbzProperties, 4321 ); assertEquals(expectedSource, actualSource); diff --git a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSource.java b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSource.java index 149b7fd1a..48f12de9b 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSource.java +++ b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSource.java @@ -49,6 +49,7 @@ public class PostgreSQLSource { private String password; private String[] schemaList; private String[] tableList; + private Properties dbzProperties; private DebeziumDeserializationSchema deserializer; /** @@ -119,6 +120,14 @@ public class PostgreSQLSource { return this; } + /** + * The Debezium Postgres connector properties. + */ + public Builder debeziumProperties(Properties properties) { + this.dbzProperties = properties; + return this; + } + /** * The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}. */ @@ -149,6 +158,11 @@ public class PostgreSQLSource { if (tableList != null) { props.setProperty("table.whitelist", String.join(",", tableList)); } + + if (dbzProperties != null) { + dbzProperties.forEach(props::put); + } + return new DebeziumSourceFunction<>( deserializer, props); diff --git a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java index e60d14a6b..d420f7ed6 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java +++ b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java @@ -18,6 +18,9 @@ package com.alibaba.ververica.cdc.connectors.postgres.table; +import static com.alibaba.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; +import static com.alibaba.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; + import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; @@ -83,7 +86,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { @Override public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - helper.validate(); + helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX); final ReadableConfig config = helper.getOptions(); String hostname = config.get(HOSTNAME); @@ -105,7 +108,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { tableName, username, password, - pluginName); + pluginName, + getDebeziumProperties(context.getCatalogTable().getOptions())); } @Override diff --git a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java index e5b829207..df4ed5b8d 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java +++ b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java @@ -34,6 +34,7 @@ import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; import java.util.Objects; +import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -52,6 +53,7 @@ public class PostgreSQLTableSource implements ScanTableSource { private final String username; private final String password; private final String pluginName; + private final Properties dbzProperties; public PostgreSQLTableSource( TableSchema physicalSchema, @@ -62,7 +64,8 @@ public class PostgreSQLTableSource implements ScanTableSource { String tableName, String username, String password, - String pluginName) { + String pluginName, + Properties dbzProperties) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -72,6 +75,7 @@ public class PostgreSQLTableSource implements ScanTableSource { this.username = checkNotNull(username); this.password = checkNotNull(password); this.pluginName = checkNotNull(pluginName); + this.dbzProperties = dbzProperties; } @Override @@ -102,6 +106,7 @@ public class PostgreSQLTableSource implements ScanTableSource { .username(username) .password(password) .decodingPluginName(pluginName) + .debeziumProperties(dbzProperties) .deserializer(deserializer) .build(); return SourceFunctionProvider.of(sourceFunction, false); @@ -118,7 +123,8 @@ public class PostgreSQLTableSource implements ScanTableSource { tableName, username, password, - pluginName); + pluginName, + dbzProperties); } @Override @@ -137,12 +143,13 @@ public class PostgreSQLTableSource implements ScanTableSource { Objects.equals(schemaName, that.schemaName) && Objects.equals(tableName, that.tableName) && Objects.equals(username, that.username) && - Objects.equals(password, that.password); + Objects.equals(password, that.password) && + Objects.equals(dbzProperties, that.dbzProperties); } @Override public int hashCode() { - return Objects.hash(physicalSchema, port, hostname, database, schemaName, tableName, username, password); + return Objects.hash(physicalSchema, port, hostname, database, schemaName, tableName, username, password, dbzProperties); } @Override diff --git a/flink-connector-postgres-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-connector-postgres-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java index 6744c586d..7c1b4fc9c 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java +++ b/flink-connector-postgres-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -58,6 +59,7 @@ public class PostgreSQLTableFactoryTest { private static final String MY_DATABASE = "myDB"; private static final String MY_TABLE = "myTable"; private static final String MY_SCHEMA = "public"; + private static final Properties PROPERTIES = new Properties(); @Test public void testCommonProperties() { @@ -74,17 +76,21 @@ public class PostgreSQLTableFactoryTest { MY_TABLE, MY_USERNAME, MY_PASSWORD, - "decoderbufs"); + "decoderbufs", + PROPERTIES); assertEquals(expectedSource, actualSource); } @Test public void testOptionalProperties() { - Map properties = getAllOptions(); - properties.put("port", "5444"); - properties.put("decoding.plugin.name", "wal2json"); - - DynamicTableSource actualSource = createTableSource(properties); + Map options = getAllOptions(); + options.put("port", "5444"); + options.put("decoding.plugin.name", "wal2json"); + options.put("debezium.snapshot.mode", "never"); + + DynamicTableSource actualSource = createTableSource(options); + Properties dbzProperties = new Properties(); + dbzProperties.put("snapshot.mode", "never"); PostgreSQLTableSource expectedSource = new PostgreSQLTableSource( TableSchemaUtils.getPhysicalSchema(SCHEMA), 5444, @@ -94,7 +100,8 @@ public class PostgreSQLTableFactoryTest { MY_TABLE, MY_USERNAME, MY_PASSWORD, - "wal2json"); + "wal2json", + dbzProperties); assertEquals(expectedSource, actualSource); }