[postgres] Support 'slot.name' option for postgres-cdc connector

release-1.2
Jark Wu 4 years ago
parent 36e1979337
commit 0ce58b4ff5
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -42,6 +42,7 @@ public class PostgreSQLSource {
public static class Builder<T> {
private String pluginName = "decoderbufs";
private String slotName = "flink";
private int port = 5432; // default 5432 port
private String hostname;
private String database;
@ -120,6 +121,20 @@ public class PostgreSQLSource {
return this;
}
/**
* The name of the PostgreSQL logical decoding slot that was created for streaming changes
* from a particular plug-in for a particular database/schema. The server uses this slot
* to stream events to the connector that you are configuring. Default is "flink".
*
* <p>Slot names must conform to <a href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL replication slot naming rules</a>,
* which state: "Each replication slot has a name, which can contain lower-case letters,
* numbers, and the underscore character."
*/
public Builder<T> slotName(String slotName) {
this.slotName = slotName;
return this;
}
/**
* The Debezium Postgres connector properties.
*/
@ -145,12 +160,13 @@ public class PostgreSQLSource {
// database server/cluster being monitored. The logical name should be unique across
// all other connectors, since it is used as a prefix for all Kafka topic names coming
// from this connector. Only alphanumeric characters and underscores should be used.
props.setProperty("database.server.name", "postgres_binlog_source");
props.setProperty("database.server.name", "postgres_cdc_source");
props.setProperty("database.hostname", checkNotNull(hostname));
props.setProperty("database.dbname", checkNotNull(database));
props.setProperty("database.user", checkNotNull(username));
props.setProperty("database.password", checkNotNull(password));
props.setProperty("database.port", String.valueOf(port));
props.setProperty("slot.name", slotName);
if (schemaList != null) {
props.setProperty("schema.whitelist", String.join(",", schemaList));

@ -83,6 +83,13 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
"Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,\n" +
"wal2json_rds_streaming and pgoutput.");
private static final ConfigOption<String> SLOT_NAME = ConfigOptions.key("slot.name")
.stringType()
.defaultValue("flink")
.withDescription("The name of the PostgreSQL logical decoding slot that was created for streaming changes " +
"from a particular plug-in for a particular database/schema. The server uses this slot " +
"to stream events to the connector that you are configuring. Default is \"flink\".");
@Override
public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
@ -97,6 +104,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
String tableName = config.get(TABLE_NAME);
int port = config.get(PORT);
String pluginName = config.get(DECODING_PLUGIN_NAME);
String slotName = config.get(SLOT_NAME);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new PostgreSQLTableSource(
@ -109,6 +117,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
username,
password,
pluginName,
slotName,
getDebeziumProperties(context.getCatalogTable().getOptions()));
}

@ -54,6 +54,7 @@ public class PostgreSQLTableSource implements ScanTableSource {
private final String username;
private final String password;
private final String pluginName;
private final String slotName;
private final Properties dbzProperties;
public PostgreSQLTableSource(
@ -66,6 +67,7 @@ public class PostgreSQLTableSource implements ScanTableSource {
String username,
String password,
String pluginName,
String slotName,
Properties dbzProperties) {
this.physicalSchema = physicalSchema;
this.port = port;
@ -76,6 +78,7 @@ public class PostgreSQLTableSource implements ScanTableSource {
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.pluginName = checkNotNull(pluginName);
this.slotName = slotName;
this.dbzProperties = dbzProperties;
}
@ -89,7 +92,6 @@ public class PostgreSQLTableSource implements ScanTableSource {
.build();
}
@SuppressWarnings("unchecked")
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
@ -108,6 +110,7 @@ public class PostgreSQLTableSource implements ScanTableSource {
.username(username)
.password(password)
.decodingPluginName(pluginName)
.slotName(slotName)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.build();
@ -126,6 +129,7 @@ public class PostgreSQLTableSource implements ScanTableSource {
username,
password,
pluginName,
slotName,
dbzProperties);
}
@ -146,12 +150,14 @@ public class PostgreSQLTableSource implements ScanTableSource {
Objects.equals(tableName, that.tableName) &&
Objects.equals(username, that.username) &&
Objects.equals(password, that.password) &&
Objects.equals(pluginName, that.pluginName) &&
Objects.equals(slotName, that.slotName) &&
Objects.equals(dbzProperties, that.dbzProperties);
}
@Override
public int hashCode() {
return Objects.hash(physicalSchema, port, hostname, database, schemaName, tableName, username, password, dbzProperties);
return Objects.hash(physicalSchema, port, hostname, database, schemaName, tableName, username, password, pluginName, slotName, dbzProperties);
}
@Override

@ -77,6 +77,7 @@ public class PostgreSQLTableFactoryTest {
MY_USERNAME,
MY_PASSWORD,
"decoderbufs",
"flink",
PROPERTIES);
assertEquals(expectedSource, actualSource);
}
@ -101,6 +102,7 @@ public class PostgreSQLTableFactoryTest {
MY_USERNAME,
MY_PASSWORD,
"wal2json",
"flink",
dbzProperties);
assertEquals(expectedSource, actualSource);
}

Loading…
Cancel
Save