[docs] Correct DataStream API usage in document examples

pull/444/head
mincwang 3 years ago committed by GitHub
parent f0dede1934
commit bbd2870b08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -66,19 +66,23 @@ Include following Maven dependency (available through Maven Central):
```java ```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.MySqlSource;
public class MySqlBinlogSourceExample { public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySqlSource.<String>builder() Properties debeziumProperties = new Properties();
.hostname("localhost") debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock
.port(3306) SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.databaseList("inventory") // monitor all tables under inventory database .hostname("yourHostname")
.username("flinkuser") .port(yourPort)
.password("flinkpw") .databaseList("yourDatabaseName") // set captured database
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String .tableList("yourDatabaseName.yourTableName") // set captured table
.build(); .username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.debeziumProperties(debeziumProperties)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

@ -88,14 +88,18 @@ import com.ververica.cdc.connectors.mysql.MySqlSource;
public class MySqlBinlogSourceExample { public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySqlSource.<String>builder() Properties debeziumProperties = new Properties();
.hostname("localhost") debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock
.port(3306) SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.databaseList("inventory") // monitor all tables under inventory database .hostname("yourHostname")
.username("flinkuser") .port(yourPort)
.password("flinkpw") .databaseList("yourDatabaseName") // set captured database
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .tableList("yourDatabaseName.yourTableName") // set captured table
.build(); .username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.debeziumProperties(debeziumProperties)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

@ -370,14 +370,18 @@ import com.ververica.cdc.connectors.mysql.MySqlSource;
public class MySqlBinlogSourceExample { public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySqlSource.<String>builder() Properties debeziumProperties = new Properties();
.hostname("localhost") debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock
.port(3306) SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.databaseList("inventory") // monitor all tables under inventory database .hostname("yourHostname")
.username("flinkuser") .port(yourPort)
.password("flinkpw") .databaseList("yourDatabaseName") // set captured database
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .tableList("yourDatabaseName.yourTableName") // set captured table
.build(); .username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.debeziumProperties(debeziumProperties)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

@ -178,8 +178,9 @@ public class PostgreSQLSourceExample {
SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder() SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
.hostname("localhost") .hostname("localhost")
.port(5432) .port(5432)
.database("postgres") .database("postgres") // monitor postgres database
.schemaList("inventory") // monitor all tables under inventory schema .schemaList("inventory") // monitor inventory schema
.tableList("inventory.products") // monitor products table
.username("flinkuser") .username("flinkuser")
.password("flinkpw") .password("flinkpw")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String

Loading…
Cancel
Save