diff --git a/docs/content/connectors/db2-cdc.md b/docs/content/connectors/db2-cdc.md index e625caa48..90b55fd6f 100644 --- a/docs/content/connectors/db2-cdc.md +++ b/docs/content/connectors/db2-cdc.md @@ -50,8 +50,8 @@ Notes ### Not support BOOLEAN type in SQL Replication on Db2 -Only snapshots can be taken from tables with BOOLEAN type columns. Currently SQL Replication on Db2 does not support BOOLEAN, so Debezium can not perform CDC on those tables. -Consider using a different type. +Only snapshots can be taken from tables with BOOLEAN type columns. Currently, SQL Replication on Db2 does not support BOOLEAN, so Debezium can not perform CDC on those tables. +Consider using another type to replace BOOLEAN type. How to create a Db2 CDC table @@ -203,36 +203,36 @@ _Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapsh ### DataStream Source ```java -import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.kafka.connect.source.SourceRecord; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; -import com.ververica.cdc.connectors.db2.Db2Source; public class Db2SourceExample { public static void main(String[] args) throws Exception { - Db2Source db2Source = Db2Source.builder() - .hostname("yourHostname") - .port(yourPort) - .database("yourDatabaseName") // set captured database - .tableList("yourSchemaName.yourTableName") // set captured table - .username("yourUsername") - .password("yourPassword") - .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String - .build(); + SourceFunction db2Source = + Db2Source.builder() + .hostname("yourHostname") + .port(50000) + .database("yourDatabaseName") // set captured database + .tableList("yourSchemaName.yourTableName") // set captured table + .username("yourUsername") + .password("yourPassword") + .deserializer( + new JsonDebeziumDeserializationSchema()) // converts SourceRecord to + // JSON String + .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); - env - .fromSource(db2Source, WatermarkStrategy.noWatermarks(), "Db2 Source") - // set 4 parallel source tasks - .setParallelism(1) - .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering + env.addSource(db2Source) + .print() + .setParallelism(1); // use parallelism 1 for sink to keep message ordering - env.execute("Print Db2 Snapshot + Binlog"); + env.execute("Print Db2 Snapshot + Change Stream"); } } ``` diff --git a/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2SourceTest.java b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2SourceTest.java index 4b394d1b6..60887a6fe 100644 --- a/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2SourceTest.java +++ b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2SourceTest.java @@ -294,7 +294,7 @@ public class Db2SourceTest extends Db2TestBase { assertDelete(records.get(0), "ID", 1001); // --------------------------------------------------------------------------- - // Step-6: trigger checkpoint-2 to make sure we can continue to to further checkpoints + // Step-6: trigger checkpoint-2 to make sure we can continue to further checkpoints // --------------------------------------------------------------------------- synchronized (sourceContext3.getCheckpointLock()) { // checkpoint 3