[mongodb][docs] Add documentation for mongodb-cdc connector (#395)

pull/423/head
Jiabao Sun 3 years ago committed by GitHub
parent e867c75580
commit f9d17f472f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,7 +9,7 @@ The Flink CDC Connectors integrates Debezium as the engine to capture data chang
| --- | --- |
| MySQL | Database: 5.7, 8.0.x <br/>JDBC Driver: 8.0.16 |
| PostgreSQL | Database: 9.6, 10, 11, 12 <br/>JDBC Driver: 42.2.12|
| MongoDB | Database: 3.6, 4.x, 5.0 <br/>MongoDB Driver: 4.3.1|
## Supported Formats

@ -5,4 +5,5 @@
mysql-cdc
postgres-cdc
mongodb-cdc
```

@ -0,0 +1,451 @@
# MongoDB CDC Connector
The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB. This document describes how to setup the MongoDB CDC connector to run SQL queries against MongoDB.
Dependencies
------------
In order to setup the MongoDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
### Maven dependency
<!-- fixme: correct the version -->
```
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.1.0</version>
</dependency>
```
### SQL Client JAR
<!-- fixme: correct the version -->
Download [flink-sql-connector-mongodb-cdc-2.1.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.1.0/flink-sql-connector-mongodb-cdc-2.1.0.jar) and put it under `<FLINK_HOME>/lib/`.
Setup MongoDB
----------------
### Availability
- MongoDB version
MongoDB version >= 3.6 <br>
We use [change streams](https://docs.mongodb.com/manual/changeStreams/) feature (new in version 3.6) to capture change data.
- Cluster Deployment
[replica sets](https://docs.mongodb.com/manual/replication/) or [sharded clusters](https://docs.mongodb.com/manual/sharding/) is required.
- Storage Engine
[WiredTiger](https://docs.mongodb.com/manual/core/wiredtiger/#std-label-storage-wiredtiger) storage engine is required.
- [Replica set protocol version](https://docs.mongodb.com/manual/reference/replica-configuration/#mongodb-rsconf-rsconf.protocolVersion)
Replica set protocol version 1 [(pv1)](https://docs.mongodb.com/manual/reference/replica-configuration/#mongodb-rsconf-rsconf.protocolVersion) is required. <br>
Starting in version 4.0, MongoDB only supports pv1. pv1 is the default for all new replica sets created with MongoDB 3.2 or later.
- Privileges
`changeStream` and `read` privileges are required by MongoDB Kafka Connector.
You can use the following example for simple authorization.<br>
For more detailed authorization, please refer to [MongoDB Database User Roles](https://docs.mongodb.com/manual/reference/built-in-roles/#database-user-roles).
```javascript
use admin;
db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" }, //read role includes changeStream privilege
{ role: "readAnyDatabase", db: "admin" } //for snapshot reading
]
});
```
How to create a MongoDB CDC table
----------------
The MongoDB CDC table can be defined as following:
```sql
-- register a MongoDB table 'products' in Flink SQL
CREATE TABLE products (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
-- read snapshot and change events from products collection
SELECT * FROM products;
```
**Note that**
MongoDB's change event record doesn't have update before message. So, we can only convert it to Flink's UPSERT changelog stream.
An upsert stream requires a unique key, so we must declare `_id` as primary key.
We can't declare other column as primary key, becauce delete operation do not contain's the key and value besides `_id` and `sharding key`.
Connector Options
----------------
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-left" style="width: 8%">Required</th>
<th class="text-left" style="width: 7%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 50%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>connector</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what connector to use, here should be <code>mongodb-cdc</code>.</td>
</tr>
<tr>
<td>hosts</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The comma-separated list of hostname and port pairs of the MongoDB servers.<br>
eg. <code>localhost:27017,localhost:27018</code>
</td>
</tr>
<tr>
<td>username</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the database user to be used when connecting to MongoDB.<br>
This is required only when MongoDB is configured to use authentication.
</td>
</tr>
<tr>
<td>password</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password to be used when connecting to MongoDB.<br>
This is required only when MongoDB is configured to use authentication.
</td>
</tr>
<tr>
<td>database</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the database to watch for changes.</td>
</tr>
<tr>
<td>collection</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the collection in the database to watch for changes.</td>
</tr>
<tr>
<td>connection.options</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The ampersand-separated <a href="https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options">connection options</a> of MongoDB. eg. <br>
<code>replicaSet=test&connectTimeoutMS=300000</code>
</td>
</tr>
<tr>
<td>errors.tolerance</td>
<td>optional</td>
<td style="word-wrap: break-word;">none</td>
<td>String</td>
<td>Whether to continue processing messages if an error is encountered.
Accept <code>none</code> or <code>all</code>.
When set to <code>none</code>, the connector reports an error and blocks further processing of the rest of the records
when it encounters an error. When set to <code>all</code>, the connector silently ignores any bad messages.
</td>
</tr>
<tr>
<td>errors.log.enable</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether details of failed operations should be written to the log file.</td>
</tr>
<tr>
<td>copy.existing</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether copy existing data from source collections.</td>
</tr>
<tr>
<td>copy.existing.pipeline</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td> An array of JSON objects describing the pipeline operations to run when copying existing data.<br>
This can improve the use of indexes by the copying manager and make copying more efficient.
eg. <code>[{"$match": {"closed": "false"}}]</code> ensures that
only documents in which the closed field is set to false are copied.
</td>
</tr>
<tr>
<td>copy.existing.max.threads</td>
<td>optional</td>
<td style="word-wrap: break-word;">Processors Count</td>
<td>Integer</td>
<td>The number of threads to use when performing the data copy.</td>
</tr>
<tr>
<td>copy.existing.queue.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">16000</td>
<td>Integer</td>
<td>The max size of the queue to use when copying data.</td>
</tr>
<tr>
<td>poll.max.batch.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>Maximum number of change stream documents to include in a single batch when polling for new data.</td>
</tr>
<tr>
<td>poll.await.time.ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">1500</td>
<td>Integer</td>
<td>The amount of time to wait before checking for new results on the change stream.</td>
</tr>
<tr>
<td>heartbeat.interval.ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>The length of time in milliseconds between sending heartbeat messages. Use 0 to disable.</td>
</tr>
</tbody>
</table>
</div>
Note: `heartbeat.interval.ms` is highly recommended to set a proper value larger than 0 **if the collection changes slowly**.
The heartbeat event can pushing the `resumeToken` forward to avoid `resumeToken` being expired when we recover the Flink job from a checkpoint or savepoint.
Features
--------
### Exactly-Once Processing
The MongoDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change stream events with **exactly-once processing** even failures happen.
### Snapshot When Startup Or Not
The config option `copy.existing` specifies whether do snapshot when MongoDB CDC consumer startup. <br>Defaults to `true`.
### Snapshot Data Filters
The config option `copy.existing.pipeline` describing the filters when copying existing data.<br>
This can filter only required data and improve the use of indexes by the copying manager.
In the following example, the `$match` aggregation operator ensures that only documents in which the closed field is set to false are copied.
```
copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
```
### Change Streams
We integrates the [MongoDB's official Kafka Connector](https://docs.mongodb.com/kafka-connector/current/kafka-source/) to read snapshot or change events from MongoDB and drive it by Debezium's `EmbeddedEngine`.
Debezium's `EmbeddedEngine` provides a mechanism for running a single Kafka Connect `SourceConnector` within an application's process and it can drive any standard Kafka Connect `SourceConnector` properly even which is not provided by Debezium.
We choose **MongoDB's official Kafka Connector** instead of the **Debezium's MongoDB Connector** cause they use a different change data capture mechanism.
- For Debezium's MongoDB Connector, it read the `oplog.rs` collection of each replica-set's master node.
- For MongoDB's Kafka Connector, it subscribe `Change Stream` of MongoDB.
MongoDB's `oplog.rs` collection doesn't keep the changed record's update before state, so it's hard to extract the full document state by a single `oplog.rs` record and convert it to change log stream accepted by Flink (Insert Only, Upsert, All).
Additionally, MongoDB 5 (released in July 2021) has changed the oplog format, so the current Debezium connector cannot be used with it.
**Change Stream** is a new feature provided by MongoDB 3.6 for replica sets and sharded clusters that allows applications to access real-time data changes without the complexity and risk of tailing the oplog.<br>
Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them.
**Lookup Full Document for Update Operations** is a feature provided by **Change Stream** which can configure the change stream to return the most current majority-committed version of the updated document. Because of this feature, we can easily collect the latest full document and convert the change log to Flink's **Upsert Changelog Stream**.
By the way, Debezium's MongoDB change streams exploration mentioned by [DBZ-435](https://issues.redhat.com/browse/DBZ-435) is on roadmap.<br>
If it's done, we can consider integrating two kinds of source connector for users to choose.
### DataStream Source
The MongoDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
public class MongoDBSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
.hosts("localhost:27017")
.username("flink")
.password("flinkpw")
.database("inventory")
.collection("products")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
```
Data Type Mapping
----------------
[BSON](https://docs.mongodb.com/manual/reference/bson-types/) short for **Binary JSON** is a bin­ary-en­coded seri­al­iz­a­tion of JSON-like format used to store documents and make remote procedure calls in MongoDB.
[Flink SQL Data Type](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/) is similar to the SQL standards data type terminology which describes the logical type of a value in the table ecosystem. It can be used to declare input and/or output types of operations.
In order to enable Flink SQL to process data from heterogeneous data sources, the data types of heterogeneous data sources need to be uniformly converted to Flink SQL data types.
The following is the mapping of BSON type and Flink SQL type.
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">BSON type<a href="https://docs.mongodb.com/manual/reference/bson-types/"></a></th>
<th class="text-left">Flink SQL type<a href="{% link dev/table/types.md %}"></a></th>
</tr>
</thead>
<tbody>
<tr>
<td></td>
<td>TINYINT</td>
</tr>
<tr>
<td></td>
<td>SMALLINT</td>
</tr>
<tr>
<td>
Int<br>
<td>INT</td>
</tr>
<tr>
<td>Long</td>
<td>BIGINT</td>
</tr>
<tr>
<td></td>
<td>FLOAT</td>
</tr>
<tr>
<td>Double</td>
<td>DOUBLE</td>
</tr>
<tr>
<td>Decimal128</td>
<td>DECIMAL(p, s)</td>
</tr>
<tr>
<td>Boolean</td>
<td>BOOLEAN</td>
</tr>
<tr>
<td>Date</br>Timestamp</td>
<td>DATE</td>
</tr>
<tr>
<td>Date</br>Timestamp</td>
<td>TIME</td>
</tr>
<tr>
<td>Date</td>
<td>TIMESTAMP(3)</br>TIMESTAMP_LTZ(3)</td>
</tr>
<tr>
<td>Timestamp</td>
<td>TIMESTAMP(0)</br>TIMESTAMP_LTZ(0)
</td>
</tr>
<tr>
<td>
String<br>
ObjectId<br>
UUID<br>
Symbol<br>
MD5<br>
JavaScript</br>
Regex</td>
<td>STRING</td>
</tr>
<tr>
<td>BinData</td>
<td>BYTES</td>
</tr>
<tr>
<td>Object</td>
<td>ROW</td>
</tr>
<tr>
<td>Array</td>
<td>ARRAY</td>
</tr>
<tr>
<td>DBPointer</td>
<td>ROW&lt;$ref STRING, $id STRING&gt;</td>
</tr>
<tr>
<td>
<a href="https://docs.mongodb.com/manual/reference/geojson/">GeoJSON</a>
</td>
<td>
Point : ROW&lt;type STRING, coordinates ARRAY&lt;DOUBLE&gt;&gt;</br>
Line : ROW&lt;type STRING, coordinates ARRAY&lt;ARRAY&lt; DOUBLE&gt;&gt;&gt;</br>
...
</td>
</tr>
</tbody>
</table>
</div>
Reference
--------
- [MongoDB Kafka Connector](https://docs.mongodb.com/kafka-connector/current/kafka-source/)
- [Change Streams](https://docs.mongodb.com/manual/changeStreams/)
- [Replication](https://docs.mongodb.com/manual/replication/)
- [Sharding](https://docs.mongodb.com/manual/sharding/)
- [Database User Roles](https://docs.mongodb.com/manual/reference/built-in-roles/#database-user-roles)
- [WiredTiger](https://docs.mongodb.com/manual/core/wiredtiger/#std-label-storage-wiredtiger)
- [Replica set protocol](https://docs.mongodb.com/manual/reference/replica-configuration/#mongodb-rsconf-rsconf.protocolVersion)
- [Connection String Options](https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options)
- [BSON Types](https://docs.mongodb.com/manual/reference/bson-types/)
- [Flink DataTypes](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/)

@ -5,4 +5,6 @@
mysql-postgres-tutorial
mysql-postgres-tutorial-zh
mongodb-tutorial
mongodb-tutorial-zh
```

@ -0,0 +1,213 @@
# Streaming ETL from MongoDB to Elasticsearch中文
1. 下载 `docker-compose.yml`
```
version: '2.1'
services:
mongo:
image: "mongo:4.0-xenial"
command: --replSet rs0 --smallfiles --oplogSize 128
ports:
- "27017:27017"
environment:
- MONGO_INITDB_ROOT_USERNAME=mongouser
- MONGO_INITDB_ROOT_PASSWORD=mongopw
elasticsearch:
image: elastic/elasticsearch:7.6.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.6.0
ports:
- "5601:5601"
```
2. 进入 MongoDB 容器,初始化副本集和数据:
```
docker-compose exec mongo /usr/bin/mongo -u mongouser -p mongopw
```
```javascript
// 1. 初始化副本集
rs.initiate();
rs.status();
// 2. 切换数据库
use mgdb;
// 3. 初始化数据
db.orders.insertMany([
{
order_id: 101,
order_date: ISODate("2020-07-30T10:08:22.001Z"),
customer_id: 1001,
price: NumberDecimal("50.50"),
product: {
name: 'scooter',
description: 'Small 2-wheel scooter'
},
order_status: false
},
{
order_id: 102,
order_date: ISODate("2020-07-30T10:11:09.001Z"),
customer_id: 1002,
price: NumberDecimal("15.00"),
product: {
name: 'car battery',
description: '12V car battery'
},
order_status: false
},
{
order_id: 103,
order_date: ISODate("2020-07-30T12:00:30.001Z"),
customer_id: 1003,
price: NumberDecimal("25.25"),
product: {
name: 'hammer',
description: '16oz carpenter hammer'
},
order_status: false
}
]);
db.customers.insertMany([
{
customer_id: 1001,
name: 'Jark',
address: 'Hangzhou'
},
{
customer_id: 1002,
name: 'Sally',
address: 'Beijing'
},
{
customer_id: 1003,
name: 'Edward',
address: 'Shanghai'
}
]);
```
3. 下载以下 jar 包到 `<FLINK_HOME>/lib/`:
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
- [flink-sql-connector-mongodb-cdc-2.1.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.1.0/flink-sql-connector-postgres-cdc-2.1.0.jar)
4. 然后启动 Flink 集群,再启动 SQL CLI.
```sql
-- Flink SQL
-- 设置间隔时间为3秒
Flink SQL> SET execution.checkpointing.interval = 3s;
-- 设置本地时区为 Asia/Shanghai
Flink SQL> SET table.local-time-zone = Asia/Shanghai;
Flink SQL> CREATE TABLE orders (
_id STRING,
order_id INT,
order_date TIMESTAMP_LTZ(3),
customer_id INT,
price DECIMAL(10, 5),
product ROW<name STRING, description STRING>,
order_status BOOLEAN,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = 'mongopw',
'database' = 'mgdb',
'collection' = 'orders'
);
Flink SQL> CREATE TABLE customers (
_id STRING,
customer_id INT,
name STRING,
address STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = 'mongopw',
'database' = 'mgdb',
'collection' = 'customers'
);
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP_LTZ(3),
customer_id INT,
price DECIMAL(10, 5),
product ROW<name STRING, description STRING>,
order_status BOOLEAN,
customer_name STRING,
customer_address STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
Flink SQL> INSERT INTO enriched_orders
SELECT o.order_id,
o.order_date,
o.customer_id,
o.price,
o.product,
o.order_status,
c.name,
c. address
FROM orders AS o
LEFT JOIN customers AS c ON o.customer_id = c.customer_id;
```
5. 修改 MongoDB 里面的数据,观察 elasticsearch 里的结果。
```javascript
db.orders.insert({
order_id: 104,
order_date: ISODate("2020-07-30T12:00:30.001Z"),
customer_id: 1004,
price: NumberDecimal("25.25"),
product: {
name: 'rocks',
description: 'box of assorted rocks'
},
order_status: false
});
db.customers.insert({
customer_id: 1004,
name: 'Jacob',
address: 'Shanghai'
});
db.orders.updateOne(
{ order_id: 104 },
{ $set: { order_status: true } }
);
db.orders.deleteOne(
{ order_id : 104 }
);
```

@ -0,0 +1,213 @@
# Streaming ETL from MongoDB to Elasticsearch
1. Create `docker-compose.yml` file using following contents:
```
version: '2.1'
services:
mongo:
image: "mongo:4.0-xenial"
command: --replSet rs0 --smallfiles --oplogSize 128
ports:
- "27017:27017"
environment:
- MONGO_INITDB_ROOT_USERNAME=mongouser
- MONGO_INITDB_ROOT_PASSWORD=mongopw
elasticsearch:
image: elastic/elasticsearch:7.6.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.6.0
ports:
- "5601:5601"
```
2. Enter Mongodb's container and initialize replica set and data:
```
docker-compose exec mongo /usr/bin/mongo -u mongouser -p mongopw
```
```javascript
// 1. initialize replica set
rs.initiate();
rs.status();
// 2. switch database
use mgdb;
// 3. initialize data
db.orders.insertMany([
{
order_id: 101,
order_date: ISODate("2020-07-30T10:08:22.001Z"),
customer_id: 1001,
price: NumberDecimal("50.50"),
product: {
name: 'scooter',
description: 'Small 2-wheel scooter'
},
order_status: false
},
{
order_id: 102,
order_date: ISODate("2020-07-30T10:11:09.001Z"),
customer_id: 1002,
price: NumberDecimal("15.00"),
product: {
name: 'car battery',
description: '12V car battery'
},
order_status: false
},
{
order_id: 103,
order_date: ISODate("2020-07-30T12:00:30.001Z"),
customer_id: 1003,
price: NumberDecimal("25.25"),
product: {
name: 'hammer',
description: '16oz carpenter hammer'
},
order_status: false
}
]);
db.customers.insertMany([
{
customer_id: 1001,
name: 'Jark',
address: 'Hangzhou'
},
{
customer_id: 1002,
name: 'Sally',
address: 'Beijing'
},
{
customer_id: 1003,
name: 'Edward',
address: 'Shanghai'
}
]);
```
3. Download following JAR package to `<FLINK_HOME>/lib/`:
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar)
- [flink-sql-connector-mongodb-cdc-2.1.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.1.0/flink-sql-connector-postgres-cdc-2.1.0.jar)
4. Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside:
```sql
-- Flink SQL
-- checkpoint every 3000 milliseconds
Flink SQL> SET execution.checkpointing.interval = 3s;
-- set local time zone as Asia/Shanghai
Flink SQL> SET table.local-time-zone = Asia/Shanghai;
Flink SQL> CREATE TABLE orders (
_id STRING,
order_id INT,
order_date TIMESTAMP_LTZ(3),
customer_id INT,
price DECIMAL(10, 5),
product ROW<name STRING, description STRING>,
order_status BOOLEAN,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = 'mongopw',
'database' = 'mgdb',
'collection' = 'orders'
);
Flink SQL> CREATE TABLE customers (
_id STRING,
customer_id INT,
name STRING,
address STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = 'mongopw',
'database' = 'mgdb',
'collection' = 'customers'
);
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP_LTZ(3),
customer_id INT,
price DECIMAL(10, 5),
product ROW<name STRING, description STRING>,
order_status BOOLEAN,
customer_name STRING,
customer_address STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
Flink SQL> INSERT INTO enriched_orders
SELECT o.order_id,
o.order_date,
o.customer_id,
o.price,
o.product,
o.order_status,
c.name,
c. address
FROM orders AS o
LEFT JOIN customers AS c ON o.customer_id = c.customer_id;
```
5. Make some changes in MongoDB, then check the result in Elasticsearch:
```javascript
db.orders.insert({
order_id: 104,
order_date: ISODate("2020-07-30T12:00:30.001Z"),
customer_id: 1004,
price: NumberDecimal("25.25"),
product: {
name: 'rocks',
description: 'box of assorted rocks'
},
order_status: false
});
db.customers.insert({
customer_id: 1004,
name: 'Jacob',
address: 'Shanghai'
});
db.orders.updateOne(
{ order_id: 104 },
{ $set: { order_status: true } }
);
db.orders.deleteOne(
{ order_id : 104 }
);
```
Loading…
Cancel
Save