Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'postgres-cdc' . |
hostname | required | (none) | String | IP address or hostname of the PostgreSQL database server. |
username | required | (none) | String | Name of the PostgreSQL database to use when connecting to the PostgreSQL database server. |
password | required | (none) | String | Password to use when connecting to the PostgreSQL database server. |
database-name | required | (none) | String | Database name of the PostgreSQL server to monitor. |
schema-name | required | (none) | String | Schema name of the PostgreSQL database to monitor. |
table-name | required | (none) | String | Table name of the PostgreSQL database to monitor. |
port | optional | 5432 | Integer | Integer port number of the PostgreSQL database server. |
slot.name | required | (none) | String | The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in
for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring.
Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character." |
decoding.plugin.name | optional | decoderbufs | String | The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput. |
changelog-mode | optional | all | String | The changelog mode used for encoding streaming changes. Supported values are all (which encodes changes as retract stream using all RowKinds) and upsert (which encodes changes as upsert stream that describes idempotent updates on a key).
upsert mode can be used for tables with primary keys when replica identity FULL is not an option. Primary keys must be set to use upsert mode. |
heartbeat.interval.ms | optional | 30s | Duration | The interval of sending heartbeat event for tracing the latest available replication slot offsets |
debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Postgres server.
For example: 'debezium.snapshot.mode' = 'never' .
See more about the Debezium's Postgres Connector properties |
debezium.snapshot.select.statement.overrides | optional | (none) | String | If you encounter a situation where there is a large amount of data in the table and you don't need all the historical data. You can try to specify the underlying configuration in debezium to select the data range you want to snapshot. This parameter only affects snapshots and does not affect subsequent data reading consumption.
Note: PostgreSQL must use schema name and table name. For example: 'debezium.snapshot.select.statement.overrides' = 'schema.table' .
After specifying the above attributes, you must also add the following attributes: debezium.snapshot.select.statement.overrides.[schema].[table]
|
debezium.snapshot.select.statement.overrides.[schema].[table] | optional | (none) | String | You can specify SQL statements to limit the data range of snapshot.
Note1: Schema and table need to be specified in the SQL statement, and the SQL should conform to the syntax of the data source.Currently. For example: 'debezium.snapshot.select.statement.overrides.schema.table' = 'select * from schema.table where 1 != 1' .
Note2: The Flink SQL client submission task does not support functions with single quotation marks in the content. For example: 'debezium.snapshot.select.statement.overrides.schema.table' = 'select * from schema.table where to_char(rq, 'yyyy-MM-dd')' .
|
scan.incremental.snapshot.enabled | optional | false | Boolean | Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including: (1) source can be parallel during snapshot reading, (2) source can perform checkpoints in the chunk granularity during snapshot reading, (3) source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading. Please see Incremental Snapshot Readingsection for more detailed information. |
Option | Required | Default | Type | Description |
---|---|---|---|---|
scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table. |
scan.startup.mode | optional | initial | String | Optional startup mode for Postgres CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Position section for more detailed information. |
chunk-meta.group.size | optional | 1000 | Integer | The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups. |
connect.timeout | optional | 30s | Duration | The maximum time that the connector should wait after trying to connect to the PostgreSQL database server before timing out. |
connect.pool.size | optional | 30 | Integer | The connection pool size. |
connect.max-retries | optional | 3 | Integer | The max retry times that the connector should retry to build database server connection. |
scan.snapshot.fetch.size | optional | 1024 | Integer | The maximum fetch size for per poll when read table snapshot. |
scan.incremental.snapshot.chunk.key-column | optional | (none) | String | The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. By default, the chunk key is the first column of the primary key. This column must be a column of the primary key. |
chunk-key.even-distribution.factor.lower-bound | optional | 0.05d | Double | The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount. |
chunk-key.even-distribution.factor.upper-bound | optional | 1000.0d | Double | The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount. |
Key | DataType | Description |
---|---|---|
table_name | STRING NOT NULL | Name of the table that contain the row. |
schema_name | STRING NOT NULL | Name of the schema that contain the row. |
database_name | STRING NOT NULL | Name of the database that contain the row. |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. |