From 40034abe01aafe5bbcfa4e045412f83cc2125fa0 Mon Sep 17 00:00:00 2001 From: gongzhongqiang <764629910@qq.com> Date: Fri, 18 Feb 2022 19:16:32 +0800 Subject: [PATCH] [docs][sqlserver] Add SqlServer CDC connector document (#705) --- docs/content/connectors/index.md | 1 + docs/content/connectors/sqlserver-cdc.md | 368 +++++++++++++++++++++++ 2 files changed, 369 insertions(+) create mode 100644 docs/content/connectors/sqlserver-cdc.md diff --git a/docs/content/connectors/index.md b/docs/content/connectors/index.md index e9bf85d7f..12748d90d 100644 --- a/docs/content/connectors/index.md +++ b/docs/content/connectors/index.md @@ -7,4 +7,5 @@ mysql-cdc postgres-cdc mongodb-cdc oracle-cdc +sqlserver-cdc ``` diff --git a/docs/content/connectors/sqlserver-cdc.md b/docs/content/connectors/sqlserver-cdc.md new file mode 100644 index 000000000..2d3f0b71b --- /dev/null +++ b/docs/content/connectors/sqlserver-cdc.md @@ -0,0 +1,368 @@ +# SQLServer CDC Connector + +The SQLServer CDC connector allows for reading snapshot data and incremental data from SQLServer database. This document describes how to setup the SQLServer CDC connector to run SQL queries against SQLServer databases. + +Dependencies +------------ + +In order to setup the SQLServer CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. + +### Maven dependency + +``` + + com.ververica + flink-connector-sqlserver-cdc + + 2.2-SNAPSHOT + +``` + +### SQL Client JAR + +```Download link is available only for stable releases.``` + +Download [flink-sql-connector-sqlserver-cdc-2.2-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.2-SNAPSHOT/flink-sql-connector-sqlserver-cdc-2.2-SNAPSHOT.jar) and put it under `/lib/`. + +How to create a SQLServer CDC table +---------------- + +The SqlServer CDC table can be defined as following: + +```sql +-- register a SqlServer table 'orders' in Flink SQL +CREATE TABLE orders ( + id INT, + order_date DATE, + purchaser INT, + quantity INT, + product_id INT, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'connector' = 'sqlserver-cdc', + 'hostname' = 'localhost', + 'port' = '1433', + 'username' = 'sa', + 'password' = 'Password!', + 'database-name' = 'inventory', + 'schema-name' = 'dbo', + 'table-name' = 'orders' +); + +-- read snapshot and binlogs from orders table +SELECT * FROM orders; +``` + +Connector Options +---------------- + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be 'sqlserver-cdc'.
hostnamerequired(none)StringIP address or hostname of the SQLServer database.
usernamerequired(none)StringUsername to use when connecting to the SQLServer database.
passwordrequired(none)StringPassword to use when connecting to the SQLServer database.
database-namerequired(none)StringDatabase name of the SQLServer database to monitor.
schema-namerequired(none)StringSchema name of the SQLServer database to monitor.
table-namerequired(none)StringTable name of the SQLServer database to monitor.
portoptional1433IntegerInteger port number of the SQLServer database.
server-time-zoneoptionalUTCStringThe session time zone in database server, e.g. "Asia/Shanghai".
debezium.*optional(none)StringPass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SQLServer. + For example: 'debezium.snapshot.mode' = 'initial_only'. + See more about the Debezium's SQLServer Connector properties
+
+ +Available Metadata +---------------- + +The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
KeyDataTypeDescription
table_nameSTRING NOT NULLName of the table that contain the row.
schema_nameSTRING NOT NULLName of the schema that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.
+ +Limitation +-------- + +### Can't perform checkpoint during scanning snapshot of tables +During scanning snapshot of database tables, since there is no recoverable position, we can't perform checkpoints. In order to not perform checkpoints, SqlServer CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if the database table is large, it is recommended to add following Flink configurations to avoid failover because of the timeout checkpoints: + +``` +execution.checkpointing.interval: 10min +execution.checkpointing.tolerable-failed-checkpoints: 100 +restart-strategy: fixed-delay +restart-strategy.fixed-delay.attempts: 2147483647 +``` + +The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields: +```sql +CREATE TABLE products ( + table_name STRING METADATA FROM 'table_name' VIRTUAL, + schema_name STRING METADATA FROM 'schema_name' VIRTUAL, + db_name STRING METADATA FROM 'database_name' VIRTUAL, + operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + id INT NOT NULL, + name STRING, + description STRING, + weight DECIMAL(10,3) +) WITH ( + 'connector' = 'sqlserver-cdc', + 'hostname' = 'localhost', + 'port' = '1433', + 'username' = 'sa', + 'password' = 'Password!', + 'database-name' = 'inventory', + 'schema-name' = 'dbo', + 'table-name' = 'products' +); +``` + +Features +-------- + +### Exactly-Once Processing + +The SQLServer CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with **exactly-once processing** even failures happen. Please read [How the connector works](https://debezium.io/documentation/reference/1.5/connectors/sqlserver.html#how-the-sqlserver-connector-works). + +### Startup Reading Position + +The config option `scan.startup.mode` specifies the startup mode for SQLServer CDC consumer. The valid enumerations are: + +- `initial` (default): Takes a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables. +- `initial_only`: Takes a snapshot of structure and data like initial but instead does not transition into streaming changes once the snapshot has completed. +- `latest_offset`: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics. + +_Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapshot.mode` configuration. So please do not use them together. If you specific both `scan.startup.mode` and `debezium.snapshot.mode` options in the table DDL, it may make `scan.startup.mode` doesn't work._ + +### Single Thread Reading + +The SQLServer CDC source can't work in parallel reading, because there is only one task can receive change events. + +### DataStream Source + +The SQLServer CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows: + +```java +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import com.ververica.cdc.connectors.sqlserver.SqlServerSource; + +public class SqlServerSourceExample { + public static void main(String[] args) throws Exception { + SourceFunction sourceFunction = SqlServerSource.builder() + .hostname("localhost") + .port(1433) + .database("sqlserver") // monitor sqlserver database + .tableList("dbo.products") // monitor products table + .username("sa") + .password("Password!") + .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env + .addSource(sourceFunction) + .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering + + env.execute(); + } +} +``` +**Note:** Please refer [Deserialization](../about.html#deserialization) for more details about the JSON deserialization. + +Data Type Mapping +---------------- + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
SQLServer typeFlink SQL type
char(n)CHAR(n)
+ varchar(n)
+ nvarchar(n)
+ nchar(n) +
VARCHAR(n)
+ text
+ ntext
+ xml +
STRING
+ decimal(p, s)
+ money
+ smallmoney +
DECIMAL(p, s)
numericNUMERIC
+ float
+ real +
FLOAT
bitBOOLEAN
intINT
tinyintTINYINT
smallintSMALLINT
bigintBIGINT
dateDATE
time(n)TIME(n)
+ datetime2
+ datetime
+ smalldatetime +
TIMESTAMP(n)
datetimeoffsetTIMESTAMP_LTZ(3)
+