@ -83,7 +83,7 @@ Include following Maven dependency (available through Maven Central):
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.String DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.Json DebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.MySqlSource;
public class MySqlBinlogSourceExample {
@ -94,7 +94,7 @@ public class MySqlBinlogSourceExample {
.databaseList("inventory") // monitor all tables under inventory database
.username("flinkuser")
.password("flinkpw")
.deserializer(new String DebeziumDeserializationSchema()) // converts SourceRecord to String
.deserializer(new Json DebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@ -107,6 +107,138 @@ public class MySqlBinlogSourceExample {
}
}
```
### Deserialization
The following JSON data show the change event in JSON format.
```json
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u", // the operation type, "u" means this this is an update event
"ts_ms": 1589362330904, // the time at which the connector processed the event
"transaction": null
}
```
**Note:** Please refer [Debezium documentation](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-events
) to know the meaning of each field.
In some cases, users can use the `JsonDebeziumDeserializationSchema(true)` Constructor to enabled include schema in the message. Then the Debezium JSON message may look like this:
```json
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"default": "flink",
"field": "name"
},
{
"type": "string",
"optional": true,
"field": "description"
},
{
"type": "double",
"optional": true,
"field": "weight"
}
],
"optional": true,
"name": "mysql_binlog_source.inventory_1pzxhca.products.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"default": "flink",
"field": "name"
},
{
"type": "string",
"optional": true,
"field": "description"
},
{
"type": "double",
"optional": true,
"field": "weight"
}
],
"optional": true,
"name": "mysql_binlog_source.inventory_1pzxhca.products.Value",
"field": "after"
},
{
"type": "struct",
"fields": {...},
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "mysql_binlog_source.inventory_1pzxhca.products.Envelope"
},
"payload": {
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u", // the operation type, "u" means this this is an update event
"ts_ms": 1589362330904, // the time at which the connector processed the event
"transaction": null
}
}
```
Usually, it is recommended to exclude schema because schema fields makes the messages very verbose which reduces parsing performance.
## Building from source