@ -1,7 +1,23 @@
# Streaming ETL from MySQL and Postgres to Elasticsearch
# Streaming ETL for MySQL and Postgres with Flink CDC
1. Create `docker-compose.yml` file using following contents:
This tutorial is to show how to quickly build streaming ETL for MySQL and Postgres with Flink CDC.
Assuming we are running an e-commerce business. The product and order data stored in MySQL, the shipment data related to the order is stored in Postgres.
We want to enrich the orders using the product and shipment table, and then load the enriched orders to ElasticSearch in real time.
In the following sections, we will describe how to use Flink Mysql/Postgres CDC to implement it.
All exercises in this tutorial are performed in the Flink SQL CLI, and the entire process uses standard SQL syntax, without a single line of Java/Scala code or IDE installation.
The overview of the architecture is as follows:
![Flink CDC Streaming ETL ](/_static/fig/mysql-postgress-tutorial/flink-cdc-streaming-etl.png "Flink CDC Streaming ETL" )
## Preparation
Prepare a Linux or MacOS computer with Docker installed.
### Starting components required
The components required in this demo are all managed in containers, so we will use `docker-compose` to start them.
Create `docker-compose.yml` file using following contents:
```
```
version: '2.1'
version: '2.1'
services:
services:
@ -43,112 +59,139 @@ services:
image: elastic/kibana:7.6.0
image: elastic/kibana:7.6.0
ports:
ports:
- "5601:5601"
- "5601:5601"
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
- "9094:9094"
depends_on:
- zookeeper
environment:
- KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
- KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS="user_behavior:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
```
```
The Docker Compose environment consists of the following containers:
2. Enter mysql's container and initialize data:
- MySQL: the `products` ,`orders` tables will be store in the database. They will be joined with data in Postgres to enrich the orders.
- Postgres: the `shipments` table will be store in the database.
- Elasticsearch: mainly used as a data sink to store enriched orders.
- Kibana: used to visualize the data in Elasticsearch.
To start all containers, run the following command in the directory that contains the `docker-compose.yml` file.
```shell
docker-compose up -d
```
```
docker-compose exec mysql mysql -uroot -p123456
This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly.
```
We can also visit [http://localhost:5601/ ](http://localhost:5601/ ) to see if Kibana is running normally.
```sql
-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3 "),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
Don’ t forget to run the following command to stop all containers after finishing the tutorial:
```shell
docker-compose down
```
```
3. Enter Postgres's container and initialize data:
### Preparing Flink and JAR package required
1. Download [Flink 1.13.2 ](https://downloads.apache.org/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz ) and unzip it to the directory `flink-1.13.2`
```
2. Download following JAR package required and put them under `flink-1.13.2/lib/` :
docker-compose exec postgres psql -h localhost -U postgres
```
**Download links are available only for stable releases.**
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar ](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar )
- [flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar ](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1-SNAPSHOT/flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar )
- [flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar ](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1-SNAPSHOT/flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar )
### Preparing data in databases
#### Preparing data in MySQL
1. Enter mysql's container:
```shell
docker-compose exec mysql mysql -uroot -p123456
```
2. Create tables and populate data:
```sql
-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3 "),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
```
#### Preparing data in Postgres
1. Enter postgres's container:
```shell
docker-compose exec postgres psql -h localhost -U postgres
```
2. Create tables and populate data
```sql
-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
```
## Starting Flink cluster and Flink SQL CLI
1. Use the following command to change to the Flink directory:
```
cd flink-1.13.2
```
2. Change the value of `taskmanager.numberOfTaskSlots` to 2 in `conf/flink-conf.yaml` for we will run two tasks at the same time.
3. Use the following command to start a Flink cluster:
```shell
./bin/start-cluster.sh
```
Then we can visit [http://localhost:8081/ ](http://localhost:8081/ ) to see if Flink is running normally, and the web page looks like:
![Flink UI ](/_static/fig/mysql-postgress-tutorial/flink-ui.png "Flink UI" )
Don’ t forget to run the following command to stop the Flink cluster after finishing the tutorial:
```shell
./bin/stop-cluster.sh
```
4. Use the following command to start a Flink SQL CLI:
```shell
./bin/sql-client.sh
```
We should see the welcome screen of the CLI client.
![Flink SQL Client ](/_static/fig/mysql-postgress-tutorial/flink-sql-client.png "Flink SQL Client" )
## Creating tables using Flink DDL in Flink SQL CLI
First, enable checkpoints every 3000 milliseconds
```sql
```sql
-- PG
-- Flink SQL
CREATE TABLE shipments (
Flink SQL> SET execution.checkpointing.interval = 3s;
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
```
```
Then, create tables that capture the change data from the corresponding database tables.
4. Download following JAR package to `<FLINK_HOME>/lib/` :
```Download links are available only for stable releases.```
- [flink-sql-connector-elasticsearch7_2.11-1.13.2.jar ](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.13.2/flink-sql-connector-elasticsearch7_2.11-1.13.2.jar )
- [flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar ](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1-SNAPSHOT/flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar )
- [flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar ](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.1-SNAPSHOT/flink-sql-connector-postgres-cdc-2.1-SNAPSHOT.jar )
5. Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside:
```sql
```sql
-- Flink SQL
-- Flink SQL
-- checkpoint every 3000 milliseconds
Flink SQL> SET execution.checkpointing.interval = 3s;
Flink SQL> CREATE TABLE products (
Flink SQL> CREATE TABLE products (
id INT,
id INT,
name STRING,
name STRING,
@ -199,7 +242,11 @@ Flink SQL> CREATE TABLE shipments (
'schema-name' = 'public',
'schema-name' = 'public',
'table-name' = 'shipments'
'table-name' = 'shipments'
);
);
```
Finally, create `enriched_orders` table that is used to load data to the Elasticsearch.
```sql
-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_id INT,
order_date TIMESTAMP(0),
order_date TIMESTAMP(0),
@ -219,78 +266,54 @@ Flink SQL> CREATE TABLE enriched_orders (
'hosts' = 'http://localhost:9200',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
'index' = 'enriched_orders'
);
);
```
## Enriching orders and load to ElasticSearch
Use Flink SQL to join the `order` table with the `products` and `shipments` table to enrich orders and write to the Elasticsearch.
```sql
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
```
```
Now, the enriched orders should be shown in Kibana.
6. Make some changes in MySQL and Postgres, then check the result in Elasticsearch:
Visit [http://localhost:5601/app/kibana#/management/kibana/index_pattern ](http://localhost:5601/app/kibana#/management/kibana/index_pattern ) to create an index pattern `enriched_orders` .
```sql
![Create Index Pattern ](/_static/fig/mysql-postgress-tutorial/kibana-create-index-pattern.png "Create Index Pattern" )
--MySQL
INSERT INTO orders
Visit [http://localhost:5601/app/kibana#/discover ](http://localhost:5601/app/kibana#/discover ) to find the enriched orders.
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
![Find enriched Orders ](/_static/fig/mysql-postgress-tutorial/kibana-detailed-orders.png "Find enriched Orders" )
--PG
INSERT INTO shipments
Next, do some change in the databases, and then the enriched orders shown in Kibana will be updated after each step in real time.
VALUES (default,10004,'Shanghai','Beijing',false);
1. Insert a new order in MySQL
```sql
--MySQL
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
--PG
```
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
2. Insert a shipment in Postgres
```sql
--MySQL
--PG
DELETE FROM orders WHERE order_id = 10004;
INSERT INTO shipments
```
VALUES (default,10004,'Shanghai','Beijing',false);
```
7. Kafka Changelog JSON format
3. Update order status in MySQL
```sql
Execute following SQL in Flink SQL CLI:
--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
```sql
```
-- Flink SQL
4. Update the shipment status in Postgres
Flink SQL> CREATE TABLE kafka_gmv (
```sql
day_str STRING,
--PG
gmv DECIMAL(10, 5)
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;
) WITH (
```
'connector' = 'kafka',
5. Delete the shipment in Postgres
'topic' = 'kafka_gmv',
```sql
'scan.startup.mode' = 'earliest-offset',
--MySQL
'properties.bootstrap.servers' = 'localhost:9092',
DELETE FROM orders WHERE order_id = 10004;
'format' = 'changelog-json'
```
);
The changes of enriched orders in Kibana are as follows:
![Enriched Orders Changes ](/_static/fig/mysql-postgress-tutorial/kibana-detailed-orders-changes.gif "Enriched Orders Changes" )
Flink SQL> INSERT INTO kafka_gmv
SELECT DATE_FORMAT(order_date, 'yyyy-MM-dd') as day_str, SUM(price) as gmv
FROM orders
WHERE order_status = true
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM-dd');
-- Consumer changelog data from Kafka, and check the result of materialized view:
Flink SQL> SELECT * FROM kafka_gmv;
```
To consumer records in Kafka using `kafka-console-consumer` :
```
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic kafka_gmv --bootstrap-server kafka:9094 --from-beginning'
```
Update orders data and check the output in Flink SQL CLI and Kafka console consumer:
```sql
-- MySQL
UPDATE orders SET order_status = true WHERE order_id = 10001;
UPDATE orders SET order_status = true WHERE order_id = 10002;
UPDATE orders SET order_status = true WHERE order_id = 10003;
INSERT INTO orders
VALUES (default, '2020-07-30 17:33:00', 'Timo', 50.00, 104, true);
UPDATE orders SET price = 40.00 WHERE order_id = 10005;
DELETE FROM orders WHERE order_id = 10005;
```