[doc] Add documents for flink-cdc

pull/294/head
Leonard Xu 4 years ago committed by Leonard Xu
parent 6d71dd221e
commit 98835f3b89

@ -3,113 +3,4 @@
Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC).
The Flink CDC Connectors integrates Debezium as the engine to capture data changes. So it can fully leverage the ability of Debezium. See more about what is [Debezium](https://github.com/debezium/debezium).
This README is meant as a brief walkthrough on the core features with Flink CDC Connectors. For a fully detailed documentation, please see [Documentation](https://github.com/ververica/flink-cdc-connectors/wiki).
## Supported (Tested) Connectors
| Database | Version |
| --- | --- |
| MySQL | Database: 5.7, 8.0.x <br/>JDBC Driver: 8.0.16 |
| PostgreSQL | Database: 9.6, 10, 11, 12 <br/>JDBC Driver: 42.2.12|
## Features
1. Supports reading database snapshot and continues to read binlogs with **exactly-once processing** even failures happen.
2. CDC connectors for DataStream API, users can consume changes on multiple databases and tables in a single job without Debezium and Kafka deployed.
3. CDC connectors for Table/SQL API, users can use SQL DDL to create a CDC source to monitor changes on a single table.
## Usage for Table/SQL API
We need several steps to setup a Flink cluster with the provided connector.
1. Setup a Flink cluster with version 1.12+ and Java 8+ installed.
2. Download the connector SQL jars from the [Download](https://github.com/ververica/flink-cdc-connectors/wiki/Downloads) page (or [build yourself](#building-from-source).
3. Put the downloaded jars under `FLINK_HOME/lib/`.
4. Restart the Flink cluster.
The example shows how to create a MySQL CDC source in [Flink SQL Client](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html) and execute queries on it.
```sql
-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
);
-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;
```
## Usage for DataStream API
Include following Maven dependency (available through Maven Central):
```
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
```
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.MySqlSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory") // monitor all tables under inventory database
.username("flinkuser")
.password("flinkpw")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
```
## Building from source
Prerequisites:
- git
- Maven
- At least Java 8
```
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests
```
Flink CDC Connectors is now available at your local `.m2` repository.
## License
The code in this repository is licensed under the [Apache Software License 2](https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE).
## Contributing
The Flink CDC Connectors welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. You can report problems to request features in the [GitHub Issues](https://github.com/ververica/flink-cdc-connectors/issues).
To get started, please see https://ververica.github.io/flink-cdc-connectors/

@ -1,17 +1,32 @@
# Flink CDC Connectors
# About Flink CDC
Flink CDC Connectors is a set of source connectors for Apache Flink, ingesting changes from different databases using change data capture (CDC).
Flink CDC Connectors is a set of source connectors for <a href="https://flink.apache.org/">Apache Flink</a>, ingesting changes from different databases using change data capture (CDC).
The Flink CDC Connectors integrates Debezium as the engine to capture data changes. So it can fully leverage the ability of Debezium. See more about what is [Debezium](https://github.com/debezium/debezium).
This README is meant as a brief walkthrough on the core features with Flink CDC Connectors. For a fully detailed documentation, please see [Documentation](https://github.com/ververica/flink-cdc-connectors/wiki).
## Supported (Tested) Connectors
## Supported Connectors
| Database | Version |
| --- | --- |
| MySQL | Database: 5.7, 8.0.x <br/>JDBC Driver: 8.0.16 |
| PostgreSQL | Database: 9.6, 10, 11, 12 <br/>JDBC Driver: 42.2.12|
## Supported Formats
| Format | Supported Connector | Flink Version |
| --- | --- | --- |
| <a href="https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format">Changelog Json</a> | <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/connectors/kafka.html">Apache Kafka</a> | 1.11+ |
## Supported Flink Versions
The version mapping between Flink CDC Connectors and Flink.
| Flink CDC Connector Version | Flink Version |
| --- | --- |
|1.0.0 | 1.11.* |
|1.1.0 | 1.11.* |
|1.2.0 | 1.12.* |
|1.3.0 | 1.12.* |
|1.4.0 | 1.13.* |
## Features
1. Supports reading database snapshot and continues to read binlogs with **exactly-once processing** even failures happen.

@ -0,0 +1,8 @@
# Connectors
```{toctree}
:maxdepth: 2
mysql-cdc
postgres-cdc
```

@ -1,26 +1,5 @@
# MySQL CDC Connector
* [Dependencies](#dependencies)
* [Maven dependency](#maven-dependency)
* [SQL Client JAR](#sql-client-jar)
* [Setup MySQL server](#setup-mysql-server)
* [Notes](#notes)
* [How MySQL CDC source works](#how-mysql-cdc-source-works)
* [Grant RELOAD permission for MySQL user](#grant-reload-permission-for-mysql-user)
* [The global read lock](#the-global-read-lock-flush-tables-with-read-lock)
* [Set a differnet SERVER ID for each job](#set-a-differnet-server-id-for-each-job)
* [Can't perform checkpoint during scaning database tables](#cant-perform-checkpoint-during-scaning-database-tables)
* [Setting up MySQL session timeouts](#setting-up-mysql-session-timeouts)
* [How to create a MySQL CDC table](#how-to-create-a-mysql-cdc-table)
* [Connector Options](#connector-options)
* [Features](#features)
* [Exactly-Once Processing](#exactly-once-processing)
* [Startup Reading Position](#startup-reading-position)
* [Single Thread Reading](#single-thread-reading)
* [DataStream Source](#datastream-source)
* [Data Type Mapping](#data-type-mapping)
* [FAQ](#faq)
The MySQL CDC connector allows for reading snapshot data and incremental data from MySQL database. This document describes how to setup the MySQL CDC connector to run SQL queries against MySQL databases.
Dependencies
@ -34,13 +13,13 @@ In order to setup the MySQL CDC connector, the following table provides dependen
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.0</version>
<version>1.4.0</version>
</dependency>
```
### SQL Client JAR
Download [flink-sql-connector-mysql-cdc-1.1.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.1.0/flink-sql-connector-mysql-cdc-1.1.0.jar) and put it under `<FLINK_HOME>/lib/`.
Download [flink-sql-connector-mysql-cdc-1.4.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar) and put it under `<FLINK_HOME>/lib/`.
Setup MySQL server
----------------
@ -136,68 +115,69 @@ SELECT * FROM orders;
Connector Options
----------------
<table class="table table-bordered">
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-left" style="width: 10%">Option</th>
<th class="text-left" style="width: 8%">Required</th>
<th class="text-left" style="width: 7%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 50%">Description</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>connector</h5></td>
<td>connector</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what connector to use, here should be <code>'mysql-cdc'</code>.</td>
</tr>
<tr>
<td><h5>hostname</h5></td>
<td>hostname</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>IP address or hostname of the MySQL database server.</td>
</tr>
<tr>
<td><h5>username</h5></td>
<td>username</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the MySQL database to use when connecting to the MySQL database server.</td>
</tr>
<tr>
<td><h5>password</h5></td>
<td>password</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password to use when connecting to the MySQL database server.</td>
</tr>
<tr>
<td><h5>database-name</h5></td>
<td>database-name</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression.</td>
</tr>
<tr>
<td><h5>table-name</h5></td>
<td>table-name</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression.</td>
</tr>
<tr>
<td><h5>port</h5></td>
<td>port</td>
<td>optional</td>
<td style="word-wrap: break-word;">3306</td>
<td>Integer</td>
<td>Integer port number of the MySQL database server.</td>
</tr>
<tr>
<td><h5>server-id</h5></td>
<td>server-id</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
@ -206,7 +186,7 @@ Connector Options
By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.</td>
</tr>
<tr>
<td><h5>scan.startup.mode</h5></td>
<td>scan.startup.mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
@ -215,7 +195,7 @@ Connector Options
Please see <a href="#startup-reading-position">Startup Reading Position</a>section for more detailed information.</td>
</tr>
<tr>
<td><h5>server-time-zone</h5></td>
<td>server-time-zone</td>
<td>optional</td>
<td style="word-wrap: break-word;">UTC</td>
<td>String</td>
@ -224,7 +204,8 @@ Connector Options
See more <a href="https://debezium.io/documentation/reference/1.2/connectors/mysql.html#_temporal_values">here</a>.</td>
</tr>
<tr>
<td><h5>debezium.min.row.count.to.stream.results</h5></td>
<td>debezium.min.row.
count.to.stream.result</td>
<td>optional</td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
@ -232,15 +213,15 @@ Connector Options
During a snapshot operation, the connector will query each included table to produce a read event for all rows in that table. This parameter determines whether the MySQL connection will pull all results for a table into memory (which is fast but requires large amounts of memory), or whether the results will instead be streamed (can be slower, but will work for very large tables). The value specifies the minimum number of rows a table must contain before the connector will stream results, and defaults to 1,000. Set this parameter to '0' to skip all table size checks and always stream all results during a snapshot.</td>
</tr>
<tr>
<td><h5>debezium.snapshot.fetch.size</h5></td>
<td>debezium.snapshot.
fetch.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Specifies the maximum number of rows that should be read in one go from each table while taking a snapshot. The connector will read the table contents in multiple batches of this size.</td>
</tr>
<tr>
<td><h5>debezium.*</h5></td>
<td>debezium.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
@ -250,6 +231,7 @@ During a snapshot operation, the connector will query each included table to pro
</tr>
</tbody>
</table>
</div>
Features
--------
@ -308,7 +290,8 @@ public class MySqlBinlogSourceExample {
Data Type Mapping
----------------
<table class="table table-bordered">
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">MySQL type<a href="https://dev.mysql.com/doc/man/8.0/en/data-types.html"></a></th>
@ -317,104 +300,105 @@ Data Type Mapping
</thead>
<tbody>
<tr>
<td><code>TINYINT</code></td>
<td><code>TINYINT</code></td>
<td>TINYINT</td>
<td>TINYINT</td>
</tr>
<tr>
<td>
<code>SMALLINT</code><br>
<code>TINYINT UNSIGNED</code></td>
<td><code>SMALLINT</code></td>
SMALLINT<br>
TINYINT UNSIGNED</td>
<td>SMALLINT</td>
</tr>
<tr>
<td>
<code>INT</code><br>
<code>MEDIUMINT</code><br>
<code>SMALLINT UNSIGNED</code></td>
<td><code>INT</code></td>
INT<br>
MEDIUMINT<br>
SMALLINT UNSIGNED</td>
<td>INT</td>
</tr>
<tr>
<td>
<code>BIGINT</code><br>
<code>INT UNSIGNED</code></td>
<td><code>BIGINT</code></td>
BIGINT<br>
INT UNSIGNED</td>
<td>BIGINT</td>
</tr>
<tr>
<td><code>BIGINT UNSIGNED</code></td>
<td><code>DECIMAL(20, 0)</code></td>
<td>BIGINT UNSIGNED</td>
<td>DECIMAL(20, 0)</td>
</tr>
<tr>
<td><code>BIGINT</code></td>
<td><code>BIGINT</code></td>
<td>BIGINT</td>
<td>BIGINT</td>
</tr>
<tr>
<td><code>FLOAT</code></td>
<td><code>FLOAT</code></td>
<td>FLOAT</td>
<td>FLOAT</td>
</tr>
<tr>
<td>
<code>DOUBLE</code><br>
<code>DOUBLE PRECISION</code></td>
<td><code>DOUBLE</code></td>
DOUBLE<br>
DOUBLE PRECISION</td>
<td>DOUBLE</td>
</tr>
<tr>
<td>
<code>NUMERIC(p, s)</code><br>
<code>DECIMAL(p, s)</code></td>
<td><code>DECIMAL(p, s)</code></td>
NUMERIC(p, s)<br>
DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
</tr>
<tr>
<td>
<code>BOOLEAN</code><br>
<code>TINYINT(1)</code></td>
<td><code>BOOLEAN</code></td>
BOOLEAN<br>
TINYINT(1)</td>
<td>BOOLEAN</td>
</tr>
<tr>
<td><code>DATE</code></td>
<td><code>DATE</code></td>
<td>DATE</td>
<td>DATE</td>
</tr>
<tr>
<td><code>TIME [(p)]</code></td>
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
<td>TIME [(p)]</td>
<td>TIME [(p)] [WITHOUT TIMEZONE]</td>
</tr>
<tr>
<td><code>DATETIME [(p)]</code></td>
<td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
<td>DATETIME [(p)]</td>
<td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td>
</tr>
<tr>
<td><code>TIMESTAMP [(p)]</code></td>
<td><code>TIMESTAMP [(p)]</code><br>
<code>TIMESTAMP [(p)] WITH LOCAL TIME ZONE</code>
<td>TIMESTAMP [(p)]</td>
<td>TIMESTAMP [(p)]<br>
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
</td>
</tr>
<tr>
<td>
<code>CHAR(n)</code><br>
<code>VARCHAR(n)</code><br>
<code>TEXT</code></td>
<td><code>STRING</code></td>
CHAR(n)<br>
VARCHAR(n)<br>
TEXT</td>
<td>STRING</td>
</tr>
<tr>
<td>
<code>BINARY</code><br>
<code>VARBINARY</code><br>
<code>BLOB</code></td>
<td><code>BYTES</code></td>
BINARY<br>
VARBINARY<br>
BLOB</td>
<td>BYTES</td>
</tr>
</tbody>
</table>
</div>
FAQ
--------
### How to skip snapshot and only read from binlog?
#### Q1: How to skip snapshot and only read from binlog?
Please see [Startup Reading Position](#startup-reading-position) section.
### How to read a shared database that contains multiple tables, e.g. user_00, user_01, ..., user_99 ?
#### Q2: How to read a shared database that contains multiple tables, e.g. user_00, user_01, ..., user_99 ?
The `table-name` option supports regular expressions to monitor multiple tables matches the regular expression. So you can set `table-name` to `user_.*` to monitor all the `user_` prefix tables. The same to the `database-name` option. Note that the shared table should be in the same schema.
### ConnectException: Received DML '...' for processing, binlog probably contains events generated with statement or mixed based replication format
#### Q3: ConnectException: Received DML '...' for processing, binlog probably contains events generated with statement or mixed based replication format
If there is above exception, please check `binlog_format` is `ROW`, you can check this by running `show variables like '%binlog_format%'` in MySQL client. Please note that even if the `binlog_format` configuration of your database is `ROW`, this configuration can be changed by other sessions, for example, `SET SESSION binlog_format='MIXED'; SET SESSION tx_isolation='REPEATABLE-READ'; COMMIT;`. Please also make sure there are no other session are changing this configuration.

@ -1,16 +1,5 @@
# Postgres CDC Connector
* [Dependencies](#dependencies)
* [Maven dependency](#maven-dependency)
* [SQL Client JAR](#sql-client-jar)
* [How to create a Postgres CDC table](#how-to-create-a-postgres-cdc-table)
* [Connector Options](#connector-options)
* [Features](#features)
* [Exactly-Once Processing](#exactly-once-processing)
* [Single Thread Reading](#single-thread-reading)
* [DataStream Source](#datastream-source)
* [Data Type Mapping](#data-type-mapping)
The Postgres CDC connector allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to setup the Postgres CDC connector to run SQL queries against PostgreSQL databases.
Dependencies
@ -24,13 +13,13 @@ In order to setup the Postgres CDC connector, the following table provides depen
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>1.1.0</version>
<version>1.4.0</version>
</dependency>
```
### SQL Client JAR
Download [flink-sql-connector-postgres-cdc-1.1.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-postgres-cdc/1.1.0/flink-sql-connector-postgres-cdc-1.1.0.jar) and put it under `<FLINK_HOME>/lib/`.
Download [flink-sql-connector-postgres-cdc-1.4.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-postgres-cdc/1.4.0/flink-sql-connector-postgres-cdc-1.4.0.jar) and put it under `<FLINK_HOME>/lib/`.
How to create a Postgres CDC table
----------------
@ -63,7 +52,8 @@ SELECT * FROM shipments;
Connector Options
----------------
<table class="table table-bordered">
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 25%">Option</th>
@ -75,63 +65,63 @@ Connector Options
</thead>
<tbody>
<tr>
<td><h5>connector</h5></td>
<td>connector</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what connector to use, here should be <code>'postgres-cdc'</code>.</td>
</tr>
<tr>
<td><h5>hostname</h5></td>
<td>hostname</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>IP address or hostname of the PostgreSQL database server.</td>
</tr>
<tr>
<td><h5>username</h5></td>
<td>username</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.</td>
</tr>
<tr>
<td><h5>password</h5></td>
<td>password</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password to use when connecting to the PostgreSQL database server.</td>
</tr>
<tr>
<td><h5>database-name</h5></td>
<td>database-name</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Database name of the PostgreSQL server to monitor.</td>
</tr>
<tr>
<td><h5>schema-name</h5></td>
<td>schema-name</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Schema name of the PostgreSQL database to monitor.</td>
</tr>
<tr>
<td><h5>table-name</h5></td>
<td>table-name</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Table name of the PostgreSQL database to monitor.</td>
</tr>
<tr>
<td><h5>port</h5></td>
<td>port</td>
<td>optional</td>
<td style="word-wrap: break-word;">5432</td>
<td>Integer</td>
<td>Integer port number of the PostgreSQL database server.</td>
</tr>
<tr>
<td><h5>decoding.plugin.name</h5></td>
<td>decoding.plugin.name</td>
<td>optional</td>
<td style="word-wrap: break-word;">decoderbufs</td>
<td>String</td>
@ -139,7 +129,7 @@ Connector Options
Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.</td>
</tr>
<tr>
<td><h5>slot.name</h5></td>
<td>slot.name</td>
<td>optional</td>
<td style="word-wrap: break-word;">flink</td>
<td>String</td>
@ -148,7 +138,7 @@ Connector Options
<br/>Slot names must conform to <a href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL replication slot naming rules</a>, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."</td>
</tr>
<tr>
<td><h5>debezium.*</h5></td>
<td>debezium.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
@ -158,6 +148,7 @@ Connector Options
</tr>
</tbody>
</table>
</div>
Note: `slot.name` is recommended to set for different tables to avoid the potential `PSQLException: ERROR: replication slot "flink" is active for PID 974` error. See more [here](https://debezium.io/documentation/reference/1.2/connectors/postgresql.html#postgresql-property-slot-name).
@ -209,7 +200,8 @@ public class PostgreSQLSourceExample {
Data Type Mapping
----------------
<table class="table table-bordered">
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">PostgreSQL type<a href="https://www.postgresql.org/docs/12/datatype.html"></a></th>
@ -219,82 +211,83 @@ Data Type Mapping
<tbody>
<tr>
<td></td>
<td><code>TINYINT</code></td>
<td>TINYINT</td>
</tr>
<tr>
<td>
<code>SMALLINT</code><br>
<code>INT2</code><br>
<code>SMALLSERIAL</code><br>
<code>SERIAL2</code></td>
<td><code>SMALLINT</code></td>
SMALLINT<br>
INT2<br>
SMALLSERIAL<br>
SERIAL2</td>
<td>SMALLINT</td>
</tr>
<tr>
<td>
<code>INTEGER</code><br>
<code>SERIAL</code></td>
<td><code>INT</code></td>
INTEGER<br>
SERIAL</td>
<td>INT</td>
</tr>
<tr>
<td>
<code>BIGINT</code><br>
<code>BIGSERIAL</code></td>
<td><code>BIGINT</code></td>
BIGINT<br>
BIGSERIAL</td>
<td>BIGINT</td>
</tr>
<tr>
<td></td>
<td><code>DECIMAL(20, 0)</code></td>
<td>DECIMAL(20, 0)</td>
</tr>
<tr>
<td><code>BIGINT</code></td>
<td><code>BIGINT</code></td>
<td>BIGINT</td>
<td>BIGINT</td>
</tr>
<tr>
<td>
<code>REAL</code><br>
<code>FLOAT4</code></td>
<td><code>FLOAT</code></td>
REAL<br>
FLOAT4</td>
<td>FLOAT</td>
</tr>
<tr>
<td>
<code>FLOAT8</code><br>
<code>DOUBLE PRECISION</code></td>
<td><code>DOUBLE</code></td>
FLOAT8<br>
DOUBLE PRECISION</td>
<td>DOUBLE</td>
</tr>
<tr>
<td>
<code>NUMERIC(p, s)</code><br>
<code>DECIMAL(p, s)</code></td>
<td><code>DECIMAL(p, s)</code></td>
NUMERIC(p, s)<br>
DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
</tr>
<tr>
<td><code>BOOLEAN</code></td>
<td><code>BOOLEAN</code></td>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
</tr>
<tr>
<td><code>DATE</code></td>
<td><code>DATE</code></td>
<td>DATE</td>
<td>DATE</td>
</tr>
<tr>
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
<td>TIME [(p)] [WITHOUT TIMEZONE]</td>
<td>TIME [(p)] [WITHOUT TIMEZONE]</td>
</tr>
<tr>
<td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
<td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
<td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td>
<td>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</td>
</tr>
<tr>
<td>
<code>CHAR(n)</code><br>
<code>CHARACTER(n)</code><br>
<code>VARCHAR(n)</code><br>
<code>CHARACTER VARYING(n)</code><br>
<code>TEXT</code></td>
<td><code>STRING</code></td>
CHAR(n)<br>
CHARACTER(n)<br>
VARCHAR(n)<br>
CHARACTER VARYING(n)<br>
TEXT</td>
<td>STRING</td>
</tr>
<tr>
<td><code>BYTEA</code></td>
<td><code>BYTES</code></td>
<td>BYTEA</td>
<td>BYTES</td>
</tr>
</tbody>
</table>
</div>

@ -0,0 +1,3 @@
# Downloads
Please see [Releases History](https://github.com/ververica/flink-cdc-connectors/releases)

@ -0,0 +1,116 @@
# Changelog JSON Format
Flink supports to emit changelogs in JSON format and interpret the output back again.
Dependencies
------------
In order to setup the Changelog JSON format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
### Maven dependency
```
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-format-changelog-json</artifactId>
<version>1.4.0</version>
</dependency>
```
### SQL Client JAR
Download [flink-format-changelog-json-1.4.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-format-changelog-json/1.4.0/flink-format-changelog-json-1.4.0.jar) and put it under `<FLINK_HOME>/lib/`.
How to use Changelog JSON format
----------------
```sql
-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka', -- using kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'json' -- the data format is json
);
-- we want to store the the UV aggregation result in kafka using changelog-json format
create table day_uv (
day_str STRING,
uv BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'day_uv',
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'changelog-json' -- the data format is changelog-json
);
-- write the UV results into kafka using changelog-json format
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');
-- reading the changelog back again
SELECT * FROM day_uv;
```
Format Options
----------------
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-center" style="width: 8%">Required</th>
<th class="text-center" style="width: 7%">Default</th>
<th class="text-center" style="width: 10%">Type</th>
<th class="text-center" style="width: 50%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>format</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what format to use, here should be 'changelog-json'.</td>
</tr>
<tr>
<td>changelog-json.ignore-parse-errors</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
<tr>
<td>changelog-json.timestamp-format.standard</td>
<td>optional</td>
<td style="word-wrap: break-word;">'SQL'</td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are 'SQL' and 'ISO-8601':
<ul>
<li>Option 'SQL' will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option 'ISO-8601'will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
</tbody>
</table>
</div>
Data Type Mapping
----------------
Currently, the Canal format uses JSON format for deserialization. Please refer to [JSON format documentation](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/json.html#data-type-mapping) for more details about the data type mapping.

@ -0,0 +1,7 @@
# Formats
```{toctree}
:maxdepth: 2
changelog-json
```

@ -0,0 +1,8 @@
# Tutorials
```{toctree}
:maxdepth: 1
tutorial
tutorial-zh
```

@ -0,0 +1,288 @@
# 中文教程
1. 下载 `docker-compose.yml`
```
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=1234
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
elasticsearch:
image: elastic/elasticsearch:7.6.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.6.0
ports:
- "5601:5601"
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
- "9094:9094"
depends_on:
- zookeeper
environment:
- KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
- KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS="user_behavior:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
```
2. 进入 mysql 容器,初始化数据:
```
docker-compose exec mysql mysql -uroot -p123456
```
```sql
-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- 是否下单
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
```
3. 进入postgres 容器,初始化数据:
```
docker-compose exec postgres psql -h localhost -U postgres
```
```sql
-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
```
4. 下载以下 jar 包到 `<FLINK_HOME>/lib/`:
- [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.11.1/flink-sql-connector-elasticsearch7_2.11-1.11.1.jar)
- [flink-sql-connector-mysql-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.0.0/flink-sql-connector-mysql-cdc-1.0.0.jar)
- [flink-sql-connector-postgres-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-postgres-cdc/1.0.0/flink-sql-connector-postgres-cdc-1.0.0.jar)
5. 然后启动 Flink 集群,再启动 SQL CLI.
```sql
--FlinkSQL
CREATE TABLE products (
id INT,
name STRING,
description STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
```
6. 修改 mysql 和 postgres 里面的数据,观察 elasticsearch 里的结果。
```sql
--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
--PG
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
--MySQL
DELETE FROM orders WHERE order_id = 10004;
```
7. Kafka changelog json format
在 SQL CLI 中:
```sql
--Flink SQL
CREATE TABLE kafka_gmv (
day_str STRING,
gmv DECIMAL(10, 5)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_gmv',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'changelog-json'
);
INSERT INTO kafka_gmv
SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
FROM orders
WHERE order_status = true
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
-- 读取 Kafka 的 changelog 数据,观察 materialize 后的结果
SELECT * FROM kafka_gmv;
```
观察 kafka 的输出:
```
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic kafka_gmv --bootstrap-server kafka:9094 --from-beginning'
```
更新 orders 数据观察SQL CLI 和 kafka console 的输出
```sql
-- MySQL
UPDATE orders SET order_status = true WHERE order_id = 10001;
UPDATE orders SET order_status = true WHERE order_id = 10002;
UPDATE orders SET order_status = true WHERE order_id = 10003;
INSERT INTO orders
VALUES (default, '2020-07-30 17:33:00', 'Timo', 50.00, 104, true);
UPDATE orders SET price = 40.00 WHERE order_id = 10005;
DELETE FROM orders WHERE order_id = 10005;
```

@ -0,0 +1,288 @@
# Tutorial
1. Create `docker-compose.yml` file using following contents:
```
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=1234
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
elasticsearch:
image: elastic/elasticsearch:7.6.0
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: elastic/kibana:7.6.0
ports:
- "5601:5601"
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
- "9094:9094"
depends_on:
- zookeeper
environment:
- KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
- KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS="user_behavior:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
```
2. Enter mysql's container and initialize data:
```
docker-compose exec mysql mysql -uroot -p123456
```
```sql
-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
```
3. Enter Postgres's container and initialize data:
```
docker-compose exec postgres psql -h localhost -U postgres
```
```sql
-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
```
4. Download following JAR package to `<FLINK_HOME>/lib/`:
- [flink-sql-connector-elasticsearch7_2.11-1.11.1.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.11.1/flink-sql-connector-elasticsearch7_2.11-1.11.1.jar)
- [flink-sql-connector-mysql-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.0.0/flink-sql-connector-mysql-cdc-1.0.0.jar)
- [flink-sql-connector-postgres-cdc-1.0.0.jar](https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-postgres-cdc/1.0.0/flink-sql-connector-postgres-cdc-1.0.0.jar)
5. Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside:
```sql
-- Flink SQL
CREATE TABLE products (
id INT,
name STRING,
description STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);
INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
```
6. Make some changes in MySQL and Postgres, then check the result in Elasticsearch:
```sql
--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
--PG
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
--MySQL
DELETE FROM orders WHERE order_id = 10004;
```
7. Kafka Changelog JSON format
Execute following SQL in Flink SQL CLI:
```sql
-- Flink SQL
CREATE TABLE kafka_gmv (
day_str STRING,
gmv DECIMAL(10, 5)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_gmv',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'changelog-json'
);
INSERT INTO kafka_gmv
SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
FROM orders
WHERE order_status = true
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
-- Consumer changelog data from Kafka, and check the result of materialized view:
SELECT * FROM kafka_gmv;
```
To consumer records in Kafka using `kafka-console-consumer`:
```
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic kafka_gmv --bootstrap-server kafka:9094 --from-beginning'
```
Update orders data and check the output in Flink SQL CLI and Kafka console consumer:
```sql
-- MySQL
UPDATE orders SET order_status = true WHERE order_id = 10001;
UPDATE orders SET order_status = true WHERE order_id = 10002;
UPDATE orders SET order_status = true WHERE order_id = 10003;
INSERT INTO orders
VALUES (default, '2020-07-30 17:33:00', 'Timo', 50.00, 104, true);
UPDATE orders SET price = 40.00 WHERE order_id = 10005;
DELETE FROM orders WHERE order_id = 10005;
```

@ -1,14 +1,16 @@
# Welcome to Flink CDC Connectors' documentation!
# Welcome to Flink CDC
```{toctree}
:maxdepth: 2
:caption: Contents
content/README
content/mysql-cdc
content/postgres-cdc
content/about
content/connectors/index
content/formats/index
content/tutorials/index
content/downloads
```
# Indices and tables
# Indices and Search
* {ref}`genindex`
* {ref}`search`

Loading…
Cancel
Save