|
|
|
@ -252,14 +252,14 @@ Connector Options
|
|
|
|
|
<td>optional</td>
|
|
|
|
|
<td style="word-wrap: break-word;">false</td>
|
|
|
|
|
<td>Boolean</td>
|
|
|
|
|
<td>Whether enable parallelism snapshot.</td>
|
|
|
|
|
<td>Whether enable incremental snapshot. The incremental snapshot feature only supports after MongoDB 4.0.</td>
|
|
|
|
|
</tr>
|
|
|
|
|
<tr>
|
|
|
|
|
<td>scan.incremental.snapshot.chunk.size.mb</td>
|
|
|
|
|
<td>optional</td>
|
|
|
|
|
<td style="word-wrap: break-word;">64</td>
|
|
|
|
|
<td>Integer</td>
|
|
|
|
|
<td>The chunk size mb of parallelism snapshot.</td>
|
|
|
|
|
<td>The chunk size mb of incremental snapshot.</td>
|
|
|
|
|
</tr>
|
|
|
|
|
</tbody>
|
|
|
|
|
</table>
|
|
|
|
@ -398,8 +398,42 @@ public class MongoDBSourceExample {
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
**Note:** If database regex is used, `readAnyDatabase` role is required.
|
|
|
|
|
The MongoDB CDC incremental connector (after 2.3.0) can be used as the following shows:
|
|
|
|
|
```java
|
|
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
|
|
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
|
|
|
|
|
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
|
|
|
|
|
|
|
|
|
|
public class MongoDBIncrementalSourceExample {
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
|
MongoDBSource<String> mongoSource =
|
|
|
|
|
MongoDBSource.<String>builder()
|
|
|
|
|
.hosts("localhost:27017")
|
|
|
|
|
.databaseList("inventory") // set captured database, support regex
|
|
|
|
|
.collectionList("inventory.products", "inventory.orders") //set captured collections, support regex
|
|
|
|
|
.username("flink")
|
|
|
|
|
.password("flinkpw")
|
|
|
|
|
.deserializer(new JsonDebeziumDeserializationSchema())
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
|
|
// enable checkpoint
|
|
|
|
|
env.enableCheckpointing(3000);
|
|
|
|
|
// set the source parallelism to 2
|
|
|
|
|
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
|
|
|
|
|
.setParallelism(2)
|
|
|
|
|
.print()
|
|
|
|
|
.setParallelism(1);
|
|
|
|
|
|
|
|
|
|
env.execute("Print MongoDB Snapshot + Change Stream");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
**Note:**
|
|
|
|
|
- If database regex is used, `readAnyDatabase` role is required.
|
|
|
|
|
- The incremental snapshot feature only supports after MongoDB 4.0.
|
|
|
|
|
|
|
|
|
|
Data Type Mapping
|
|
|
|
|
----------------
|
|
|
|
|