diff --git a/docs/content/quickstart/oracle-tutorial.md b/docs/content/quickstart/oracle-tutorial.md index bc93a6162..c52e1faf6 100644 --- a/docs/content/quickstart/oracle-tutorial.md +++ b/docs/content/quickstart/oracle-tutorial.md @@ -1,6 +1,6 @@ # Demo: Oracle CDC to Elasticsearch -**1. Create `docker-compose.yml` file using following contents:** +**Create `docker-compose.yml` file using following contents:** ``` version: '2.1' @@ -34,7 +34,7 @@ services: - /var/run/docker.sock:/var/run/docker.sock ``` The Docker Compose environment consists of the following containers: -- Oracle: Oracle 11g and a pre-populated `products` and `orders` table in the database. +- Oracle: Oracle 19c database. - Elasticsearch: store the join result of the `orders` and `products` table. - Kibana: mainly used to visualize the data in Elasticsearch @@ -50,14 +50,78 @@ Don’t forget to run the following command to stop all containers after you fin docker-compose down ``` -**2. Download following JAR package to `/lib`** +**Download following JAR package to `/lib`** -*Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. * +**Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release-branches by yourself.** - [flink-sql-connector-elasticsearch7-3.0.1-1.17.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar) -- [flink-sql-connector-oracle-cdc-2.4.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.4.0/flink-sql-connector-oracle-cdc-2.4.0.jar) +- [flink-sql-connector-oracle-cdc-2.5-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.5-SNAPSHOT/flink-sql-connector-oracle-cdc-2.5-SNAPSHOT.jar) -**3. Launch a Flink cluster and start a Flink SQL CLI** + +**Preparing data in Oracle database** + +Introduce the tables in Oracle: +```shell +docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB +``` +```sql +BEGIN +EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.PRODUCTS'; +EXCEPTION + WHEN OTHERS THEN + IF SQLCODE != -942 THEN + RAISE; +END IF; +END; +/ + +CREATE TABLE DEBEZIUM.PRODUCTS ( + ID NUMBER(9, 0) NOT NULL, + NAME VARCHAR(255) NOT NULL, + DESCRIPTION VARCHAR(512), + WEIGHT FLOAT, + PRIMARY KEY(ID) +); + +BEGIN +EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.ORDERS'; +EXCEPTION + WHEN OTHERS THEN + IF SQLCODE != -942 THEN + RAISE; +END IF; +END; +/ + +CREATE TABLE DEBEZIUM.ORDERS ( + ID NUMBER(9, 0) NOT NULL, + ORDER_DATE TIMESTAMP(3) NOT NULL, + PURCHASER VARCHAR(255) NOT NULL, + QUANTITY NUMBER(9, 0) NOT NULL, + PRODUCT_ID NUMBER(9, 0) NOT NULL, + PRIMARY KEY(ID) +); + +ALTER TABLE DEBEZIUM.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; +ALTER TABLE DEBEZIUM.ORDERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +INSERT INTO DEBEZIUM.PRODUCTS VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (102, 'car battery', '12V car battery', 8.1); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (104, 'hammer', '12oz carpenter''s hammer', 0.75); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (105, 'hammer', '14oz carpenter''s hammer', 0.875); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (106, 'hammer', '16oz carpenter''s hammer', 1.0); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (107, 'rocks', 'box of assorted rocks', 5.3); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (108, 'jacket', 'water resistent black wind breaker', 0.1); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (109, 'spare tire', '24 inch spare tire', 22.2); + +INSERT INTO DEBEZIUM.ORDERS VALUES (1001, TO_TIMESTAMP('2020-07-30 10:08:22.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 101); +INSERT INTO DEBEZIUM.ORDERS VALUES (1002, TO_TIMESTAMP('2020-07-30 10:11:09.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Sally', 2, 102); +INSERT INTO DEBEZIUM.ORDERS VALUES (1003, TO_TIMESTAMP('2020-07-30 12:00:30.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Edward', 2, 103); +INSERT INTO DEBEZIUM.ORDERS VALUES (1004, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 104); +``` + +**Launch a Flink cluster and start a Flink SQL CLI** Execute following SQL statements in the Flink SQL CLI: @@ -75,38 +139,36 @@ Flink SQL> CREATE TABLE products ( 'connector' = 'oracle-cdc', 'hostname' = 'localhost', 'port' = '1521', - 'username' = 'flinkuser', - 'password' = 'flinkpw', + 'username' = 'dbzuser', + 'password' = 'dbz', 'database-name' = 'ORCLCDB', - 'schema-name' = 'flinkuser', + 'schema-name' = 'DEBEZIUM', 'table-name' = 'products' ); Flink SQL> CREATE TABLE orders ( - ORDER_ID INT, - ORDER_DATE TIMESTAMP_LTZ(3), - CUSTOMER_NAME STRING, - PRICE DECIMAL(10, 5), + ID INT, + ORDER_DATE TIMESTAMP(3), + PURCHASER STRING, + QUANTITY INT, PRODUCT_ID INT, ORDER_STATUS BOOLEAN ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = 'localhost', 'port' = '1521', - 'username' = 'flinkuser', - 'password' = 'flinkpw', + 'username' = 'dbzuser', + 'password' = 'dbz', 'database-name' = 'ORCLCDB', - 'schema-name' = 'flinkuser', + 'schema-name' = 'DEBEZIUM', 'table-name' = 'orders' ); Flink SQL> CREATE TABLE enriched_orders ( ORDER_ID INT, - ORDER_DATE TIMESTAMP_LTZ(3), - CUSTOMER_NAME STRING, - PRICE DECIMAL(10, 5), - PRODUCT_ID INT, - ORDER_STATUS BOOLEAN, + ORDER_DATE TIMESTAMP(3), + PURCHASER STRING, + QUANTITY INT, PRODUCT_NAME STRING, PRODUCT_DESCRIPTION STRING, PRIMARY KEY (ORDER_ID) NOT ENFORCED @@ -117,27 +179,27 @@ Flink SQL> CREATE TABLE enriched_orders ( ); Flink SQL> INSERT INTO enriched_orders - SELECT o.*, p.NAME, p.DESCRIPTION + SELECT o.ID,o.ORDER_DATE,o.PURCHASER,o.QUANTITY, p.NAME, p.DESCRIPTION FROM orders AS o LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID; ``` -**4. Check result in Elasticsearch** +**Check result in Elasticsearch** Check the data has been written to Elasticsearch successfully, you can visit [Kibana](http://localhost:5601/) to see the data. -**5. Make changes in Oracle and watch result in Elasticsearch** +**Make changes in Oracle and watch result in Elasticsearch** Enter Oracle's container to make some changes in Oracle, then you can see the result in Elasticsearch will change after executing every SQL statement: ```shell -docker-compose exec sqlplus flinkuser/flinkpw +docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB ``` ```sql -INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0); +INSERT INTO DEBEZIUM.ORDERS VALUES (1005, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 5, 105); -UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004; +UPDATE DEBEZIUM.ORDERS SET QUANTITY = 10 WHERE ID = 1002; -DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004; +DELETE FROM DEBEZIUM.ORDERS WHERE ID = 1004; ``` \ No newline at end of file diff --git a/docs/content/快速上手/oracle-tutorial-zh.md b/docs/content/快速上手/oracle-tutorial-zh.md index e1ad4f7ee..6e7520807 100644 --- a/docs/content/快速上手/oracle-tutorial-zh.md +++ b/docs/content/快速上手/oracle-tutorial-zh.md @@ -1,6 +1,6 @@ # 演示: Oracle CDC 导入 Elasticsearch -**1. 创建`docker-compose.yml`文件,内容如下所示:** +**创建`docker-compose.yml`文件,内容如下所示:** ``` version: '2.1' @@ -34,7 +34,7 @@ services: - /var/run/docker.sock:/var/run/docker.sock ``` 该 Docker Compose 中包含的容器有: -- Oracle: Oracle 11g, 已经预先创建了 `products` 和 `orders`表,并插入了一些数据. +- Oracle: Oracle 19c 数据库 - Elasticsearch: `orders` 表将和 `products` 表进行join,join的结果写入Elasticsearch中 - Kibana: 可视化 Elasticsearch 中的数据 @@ -50,14 +50,79 @@ docker-compose up -d docker-compose down ```` -**2. 下载以下 jar 包到 `/lib/`:** +**下载以下 jar 包到 `/lib/`:** *下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译* - [flink-sql-connector-elasticsearch7-3.0.1-1.17.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar) - [flink-sql-connector-oracle-cdc-2.4.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.4.0/flink-sql-connector-oracle-cdc-2.4.0.jar) -**3. 然后启动 Flink 集群,再启动 SQL CLI:** + +**在 Oracle 数据库中准备数据** + +创建数据库和表 `products`,`orders`,并插入数据: + +```shell +docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB +``` +```sql +BEGIN +EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.PRODUCTS'; +EXCEPTION + WHEN OTHERS THEN + IF SQLCODE != -942 THEN + RAISE; +END IF; +END; +/ + +CREATE TABLE DEBEZIUM.PRODUCTS ( + ID NUMBER(9, 0) NOT NULL, + NAME VARCHAR(255) NOT NULL, + DESCRIPTION VARCHAR(512), + WEIGHT FLOAT, + PRIMARY KEY(ID) +); + +BEGIN +EXECUTE IMMEDIATE 'DROP TABLE DEBEZIUM.ORDERS'; +EXCEPTION + WHEN OTHERS THEN + IF SQLCODE != -942 THEN + RAISE; +END IF; +END; +/ + +CREATE TABLE DEBEZIUM.ORDERS ( + ID NUMBER(9, 0) NOT NULL, + ORDER_DATE TIMESTAMP(3) NOT NULL, + PURCHASER VARCHAR(255) NOT NULL, + QUANTITY NUMBER(9, 0) NOT NULL, + PRODUCT_ID NUMBER(9, 0) NOT NULL, + PRIMARY KEY(ID) +); + +ALTER TABLE DEBEZIUM.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; +ALTER TABLE DEBEZIUM.ORDERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +INSERT INTO DEBEZIUM.PRODUCTS VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (102, 'car battery', '12V car battery', 8.1); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (104, 'hammer', '12oz carpenter''s hammer', 0.75); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (105, 'hammer', '14oz carpenter''s hammer', 0.875); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (106, 'hammer', '16oz carpenter''s hammer', 1.0); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (107, 'rocks', 'box of assorted rocks', 5.3); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (108, 'jacket', 'water resistent black wind breaker', 0.1); +INSERT INTO DEBEZIUM.PRODUCTS VALUES (109, 'spare tire', '24 inch spare tire', 22.2); + +INSERT INTO DEBEZIUM.ORDERS VALUES (1001, TO_TIMESTAMP('2020-07-30 10:08:22.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 101); +INSERT INTO DEBEZIUM.ORDERS VALUES (1002, TO_TIMESTAMP('2020-07-30 10:11:09.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Sally', 2, 102); +INSERT INTO DEBEZIUM.ORDERS VALUES (1003, TO_TIMESTAMP('2020-07-30 12:00:30.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Edward', 2, 103); +INSERT INTO DEBEZIUM.ORDERS VALUES (1004, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 1, 104); +``` + +**然后启动 Flink 集群,再启动 SQL CLI:** ```sql -- Flink SQL @@ -73,38 +138,36 @@ Flink SQL> CREATE TABLE products ( 'connector' = 'oracle-cdc', 'hostname' = 'localhost', 'port' = '1521', - 'username' = 'flinkuser', - 'password' = 'flinkpw', + 'username' = 'dbzuser', + 'password' = 'dbz', 'database-name' = 'ORCLCDB', - 'schema-name' = 'flinkuser', + 'schema-name' = 'DEBEZIUM', 'table-name' = 'products' ); Flink SQL> CREATE TABLE orders ( - ORDER_ID INT, - ORDER_DATE TIMESTAMP_LTZ(3), - CUSTOMER_NAME STRING, - PRICE DECIMAL(10, 5), + ID INT, + ORDER_DATE TIMESTAMP(3), + PURCHASER STRING, + QUANTITY INT, PRODUCT_ID INT, ORDER_STATUS BOOLEAN ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = 'localhost', 'port' = '1521', - 'username' = 'flinkuser', - 'password' = 'flinkpw', + 'username' = 'dbzuser', + 'password' = 'dbz', 'database-name' = 'ORCLCDB', - 'schema-name' = 'flinkuser', + 'schema-name' = 'DEBEZIUM', 'table-name' = 'orders' ); Flink SQL> CREATE TABLE enriched_orders ( ORDER_ID INT, - ORDER_DATE TIMESTAMP_LTZ(3), - CUSTOMER_NAME STRING, - PRICE DECIMAL(10, 5), - PRODUCT_ID INT, - ORDER_STATUS BOOLEAN, + ORDER_DATE TIMESTAMP(3), + PURCHASER STRING, + QUANTITY INT, PRODUCT_NAME STRING, PRODUCT_DESCRIPTION STRING, PRIMARY KEY (ORDER_ID) NOT ENFORCED @@ -115,27 +178,27 @@ Flink SQL> CREATE TABLE enriched_orders ( ); Flink SQL> INSERT INTO enriched_orders - SELECT o.*, p.NAME, p.DESCRIPTION + SELECT o.ID,o.ORDER_DATE,o.PURCHASER,o.QUANTITY, p.NAME, p.DESCRIPTION FROM orders AS o LEFT JOIN products AS p ON o.PRODUCT_ID = p.ID; ``` -**4. 检查 ElasticSearch 中的结果** +**检查 ElasticSearch 中的结果** 检查最终的结果是否写入ElasticSearch中, 可以在[Kibana](http://localhost:5601/)看到ElasticSearch中的数据 -**5. 在 Oracle 制造一些变更,观察 ElasticSearch 中的结果** +**在 Oracle 制造一些变更,观察 ElasticSearch 中的结果** 进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新。 ```shell -docker-compose exec sqlplus flinkuser/flinkpw +docker-compose exec oracle sqlplus debezium/dbz@localhost:1521/ORCLCDB ``` ```sql -INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0); +INSERT INTO DEBEZIUM.ORDERS VALUES (1005, TO_TIMESTAMP('2020-07-30 15:22:00.001000', 'YYYY-MM-DD HH24:MI:SS.FF'), 'Jark', 5, 105); -UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004; +UPDATE DEBEZIUM.ORDERS SET QUANTITY = 10 WHERE ID = 1002; -DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004; +DELETE FROM DEBEZIUM.ORDERS WHERE ID = 1004; ``` \ No newline at end of file