diff --git a/README.md b/README.md index 9d849cb12..750062f8e 100644 --- a/README.md +++ b/README.md @@ -66,19 +66,23 @@ Include following Maven dependency (available through Maven Central): ```java import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.connectors.mysql.MySqlSource; public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { - SourceFunction sourceFunction = MySqlSource.builder() - .hostname("localhost") - .port(3306) - .databaseList("inventory") // monitor all tables under inventory database - .username("flinkuser") - .password("flinkpw") - .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String - .build(); + Properties debeziumProperties = new Properties(); + debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock + SourceFunction sourceFunction = MySQLSource.builder() + .hostname("yourHostname") + .port(yourPort) + .databaseList("yourDatabaseName") // set captured database + .tableList("yourDatabaseName.yourTableName") // set captured table + .username("yourUsername") + .password("yourPassword") + .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String + .debeziumProperties(debeziumProperties) + .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/docs/content/about.md b/docs/content/about.md index 5fbdd715e..fa3b1d69e 100644 --- a/docs/content/about.md +++ b/docs/content/about.md @@ -88,14 +88,18 @@ import com.ververica.cdc.connectors.mysql.MySqlSource; public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { - SourceFunction sourceFunction = MySqlSource.builder() - .hostname("localhost") - .port(3306) - .databaseList("inventory") // monitor all tables under inventory database - .username("flinkuser") - .password("flinkpw") - .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String - .build(); + Properties debeziumProperties = new Properties(); + debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock + SourceFunction sourceFunction = MySQLSource.builder() + .hostname("yourHostname") + .port(yourPort) + .databaseList("yourDatabaseName") // set captured database + .tableList("yourDatabaseName.yourTableName") // set captured table + .username("yourUsername") + .password("yourPassword") + .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String + .debeziumProperties(debeziumProperties) + .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/docs/content/connectors/mysql-cdc.md b/docs/content/connectors/mysql-cdc.md index 98d71ca9f..afe6f91b2 100644 --- a/docs/content/connectors/mysql-cdc.md +++ b/docs/content/connectors/mysql-cdc.md @@ -370,14 +370,18 @@ import com.ververica.cdc.connectors.mysql.MySqlSource; public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { - SourceFunction sourceFunction = MySqlSource.builder() - .hostname("localhost") - .port(3306) - .databaseList("inventory") // monitor all tables under inventory database - .username("flinkuser") - .password("flinkpw") - .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String - .build(); + Properties debeziumProperties = new Properties(); + debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock + SourceFunction sourceFunction = MySQLSource.builder() + .hostname("yourHostname") + .port(yourPort) + .databaseList("yourDatabaseName") // set captured database + .tableList("yourDatabaseName.yourTableName") // set captured table + .username("yourUsername") + .password("yourPassword") + .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String + .debeziumProperties(debeziumProperties) + .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/docs/content/connectors/postgres-cdc.md b/docs/content/connectors/postgres-cdc.md index c381d802a..efff72eae 100644 --- a/docs/content/connectors/postgres-cdc.md +++ b/docs/content/connectors/postgres-cdc.md @@ -178,8 +178,9 @@ public class PostgreSQLSourceExample { SourceFunction sourceFunction = PostgreSQLSource.builder() .hostname("localhost") .port(5432) - .database("postgres") - .schemaList("inventory") // monitor all tables under inventory schema + .database("postgres") // monitor postgres database + .schemaList("inventory") // monitor inventory schema + .tableList("inventory.products") // monitor products table .username("flinkuser") .password("flinkpw") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String