|
|
|
@ -1,6 +1,6 @@
|
|
|
|
|
# Demo: TIDB CDC to Elasticsearch
|
|
|
|
|
# Demo: TiDB CDC to Elasticsearch
|
|
|
|
|
|
|
|
|
|
**First,we will start TIDB cluster with docker.**
|
|
|
|
|
**First,we will start TiDB cluster with docker.**
|
|
|
|
|
|
|
|
|
|
```shell
|
|
|
|
|
$ git clone https://github.com/pingcap/tidb-docker-compose.git
|
|
|
|
@ -93,7 +93,7 @@ services:
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
The Docker Compose environment consists of the following containers:
|
|
|
|
|
- TIDB cluster: tikv、pd、tidb.
|
|
|
|
|
- TiDB cluster: tikv、pd、tidb.
|
|
|
|
|
- Elasticsearch: store the join result of the `orders` and `products` table.
|
|
|
|
|
- Kibana: mainly used to visualize the data in Elasticsearch.
|
|
|
|
|
|
|
|
|
@ -120,12 +120,12 @@ docker-compose down
|
|
|
|
|
- [flink-sql-connector-tidb-cdc-2.2-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.2-SNAPSHOT/flink-sql-connector-tidb-cdc-2.2-SNAPSHOT.jar)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**Preparing data in TIDB database**
|
|
|
|
|
**Preparing data in TiDB database**
|
|
|
|
|
|
|
|
|
|
Create databases/tables and populate data
|
|
|
|
|
|
|
|
|
|
```sql
|
|
|
|
|
-- TIDB
|
|
|
|
|
-- TiDB
|
|
|
|
|
CREATE DATABASE mydb;
|
|
|
|
|
USE mydb;
|
|
|
|
|
CREATE TABLE products (
|
|
|
|
@ -173,29 +173,24 @@ Flink SQL> CREATE TABLE products (
|
|
|
|
|
PRIMARY KEY (id) NOT ENFORCED
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'tidb-cdc',
|
|
|
|
|
'hostname' = '127.0.0.1',
|
|
|
|
|
'tikv.grpc.timeout_in_ms' = '20000',
|
|
|
|
|
'pd-addresses' = '127.0.0.1:2379',
|
|
|
|
|
'username' = 'root',
|
|
|
|
|
'password' = '',
|
|
|
|
|
'database-name' = 'mydb',
|
|
|
|
|
'table-name' = 'products'
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Flink SQL> CREATE TABLE orders (
|
|
|
|
|
id INT,
|
|
|
|
|
order_date DATE,
|
|
|
|
|
purchaser INT,
|
|
|
|
|
quantity INT,
|
|
|
|
|
order_id INT,
|
|
|
|
|
order_date TIMESTAMP(3),
|
|
|
|
|
customer_name STRING,
|
|
|
|
|
price DECIMAL(10, 5),
|
|
|
|
|
product_id INT,
|
|
|
|
|
PRIMARY KEY (id) NOT ENFORCED
|
|
|
|
|
order_status BOOLEAN,
|
|
|
|
|
PRIMARY KEY (order_id) NOT ENFORCED
|
|
|
|
|
) WITH (
|
|
|
|
|
'connector' = 'tidb-cdc',
|
|
|
|
|
'hostname' = '127.0.0.1',
|
|
|
|
|
'tikv.grpc.timeout_in_ms' = '20000',
|
|
|
|
|
'pd-addresses' = '127.0.0.1:2379',
|
|
|
|
|
'username' = 'root',
|
|
|
|
|
'password' = '',
|
|
|
|
|
'database-name' = 'mydb',
|
|
|
|
|
'table-name' = 'orders'
|
|
|
|
|
);
|
|
|
|
@ -203,8 +198,8 @@ Flink SQL> CREATE TABLE orders (
|
|
|
|
|
Flink SQL> CREATE TABLE enriched_orders (
|
|
|
|
|
order_id INT,
|
|
|
|
|
order_date DATE,
|
|
|
|
|
purchaser INT,
|
|
|
|
|
quantity INT,
|
|
|
|
|
customer_name STRING,
|
|
|
|
|
order_status BOOLEAN,
|
|
|
|
|
product_name STRING,
|
|
|
|
|
product_description STRING,
|
|
|
|
|
PRIMARY KEY (order_id) NOT ENFORCED
|
|
|
|
@ -215,7 +210,7 @@ Flink SQL> CREATE TABLE enriched_orders (
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Flink SQL> INSERT INTO enriched_orders
|
|
|
|
|
SELECT o.id,o.order_date,o.purchaser,o.quantity, p.name, p.description
|
|
|
|
|
SELECT o.order_id, o.order_date, o.customer_name, o.order_status, p.name, p.description
|
|
|
|
|
FROM orders AS o
|
|
|
|
|
LEFT JOIN products AS p ON o.product_id = p.id;
|
|
|
|
|
```
|
|
|
|
@ -224,7 +219,7 @@ Flink SQL> INSERT INTO enriched_orders
|
|
|
|
|
|
|
|
|
|
Check the data has been written to Elasticsearch successfully, you can visit [Kibana](http://localhost:5601/) to see the data.
|
|
|
|
|
|
|
|
|
|
**Make changes in TIDB and watch result in Elasticsearch**
|
|
|
|
|
**Make changes in TiDB and watch result in Elasticsearch**
|
|
|
|
|
|
|
|
|
|
Do some changes in the databases, and then the enriched orders shown in Kibana will be updated after each step in real time.
|
|
|
|
|
|
|
|
|
|