Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | 指定要使用的连接器,此处应为 mongodb-cdc . |
hosts | required | (none) | String | MongoDB 服务器的主机名和端口对的逗号分隔列表。 eg. localhost:27017,localhost:27018
|
username | optional | (none) | String | 连接到 MongoDB 时要使用的数据库用户的名称。 只有当 MongoDB 配置为使用身份验证时,才需要这样做。 |
password | optional | (none) | String | 连接到 MongoDB 时要使用的密码。 只有当 MongoDB 配置为使用身份验证时,才需要这样做。 |
database | optional | (none) | String | 要监视更改的数据库的名称。 如果未设置,则将捕获所有数据库。 该数据库还支持正则表达式来监视与正则表达式匹配的多个数据库。 |
collection | optional | (none) | String | 数据库中要监视更改的集合的名称。 如果未设置,则将捕获所有集合。 该集合还支持正则表达式来监视与完全限定的集合标识符匹配的多个集合。 |
connection.options | optional | (none) | String | 电流和分离 连接选项 of MongoDB. eg. replicaSet=test&connectTimeoutMS=300000
|
copy.existing | optional | true | Boolean | 是否从源集合复制现有数据。 |
copy.existing.queue.size | optional | 10240 | Integer | 复制数据时要使用的队列的最大大小。 |
batch.size | optional | 1024 | Integer | 光标批次大小。 |
poll.max.batch.size | optional | 1024 | Integer | 轮询新数据时,单个批处理中要包含的更改流文档的最大数量。 |
poll.await.time.ms | optional | 1000 | Integer | 在更改流上检查新结果之前等待的时间。 |
heartbeat.interval.ms | optional | 0 | Integer | 发送检测信号消息之间的时间长度(以毫秒为单位)。使用 0 禁用。 |
scan.incremental.snapshot.enabled | optional | false | Boolean | 是否启用增量快照。增量快照功能仅支持 MongoDB 4.0 之后的版本。 |
scan.incremental.snapshot.chunk.size.mb | optional | 64 | Integer | 增量快照的区块大小 mb。 |
Key | DataType | Description |
---|---|---|
database_name | STRING NOT NULL | 包含该行的数据库的名称。 |
collection_name | STRING NOT NULL | 包含该行的集合的名称。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 它指示在数据库中进行更改的时间。 如果记录是从表的快照而不是改变流中读取的,该值将始终为0。 |
BSON type | Flink SQL type |
---|---|
TINYINT | |
SMALLINT | |
Int | INT |
Long | BIGINT |
FLOAT | |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
DateTimestamp | DATE |
DateTimestamp | TIME |
Date | TIMESTAMP(3)TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0)TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex |
STRING |
BinData | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point : ROW<type STRING, coordinates ARRAY<DOUBLE>> Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> ... |