[FLINK-37224][docs] Add the missing documents and parameters of MongoDB CDC

This closes #3895
pull/3910/head
Kunni 3 weeks ago committed by GitHub
parent 1fa762dd63
commit 6aeb5e8c9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -488,6 +488,62 @@ public class MongoDBIncrementalSourceExample {
- 如果使用数据库正则表达式,则需要 `readAnyDatabase` 角色。
- 增量快照功能仅支持 MongoDB 4.0 之后的版本。
### 完整的 Changelog
MongoDB 6.0 以及更高的版本支持发送变更流事件,其中包含文档的更新前和更新后的内容(或者说数据的前后镜像)。
- 前镜像是指被替换、更新或删除之前的文档。对于插入操作没有前镜像。
- 后镜像是指被替换、更新或删除之后的文档。对于删除操作没有后镜像。
MongoDB CDC 能够使用前镜像和后镜像来生成完整的变更日志流,包括插入、更新前、更新后和删除的数据行,从而避免了额外的 `ChangelogNormalize` 下游节点。
为了启用此功能,你需要满足以下条件:
- MongoDB 的版本必须为 6.0 或更高版本。
- 启用 `preAndPostImages` 功能。
```javascript
db.runCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: {
expireAfterSeconds: 'off' // replace with custom image expiration time
}
}
}
})
```
- 为希望监控的 collection 启用 `changeStreamPreAndPostImages` 功能:
```javascript
db.runCommand({
collMod: "<< collection name >>",
changeStreamPreAndPostImages: {
enabled: true
}
})
```
在 DataStream 中开启 MongoDB CDC 的 `scan.full-changelog` 功能:
```java
MongoDBSource.builder()
.scanFullChangelog(true)
...
.build()
```
或者使用 Flink SQL:
```SQL
CREATE TABLE mongodb_source (...) WITH (
'connector' = 'mongodb-cdc',
'scan.full-changelog' = 'true',
...
)
```
数据类型映射
----------------
[BSON](https://docs.mongodb.com/manual/reference/bson-types/) **二进制 JSON**的缩写是一种类似 JSON 格式的二进制编码序列,用于在 MongoDB 中存储文档和进行远程过程调用。

Loading…
Cancel
Save