[common] Support json deserialization schema

pull/387/head
mincwang 3 years ago committed by GitHub
parent c52576f6d4
commit 7f12859304
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import java.util.HashMap;
/**
* A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the
* received {@link SourceRecord} to JSON String.
*/
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = 1L;
private static final JsonConverter CONVERTER = new JsonConverter();
public JsonDebeziumDeserializationSchema() {
this(false);
}
public JsonDebeziumDeserializationSchema(boolean includeSchema) {
final HashMap<String, Object> configs = new HashMap<>();
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
CONVERTER.configure(configs);
}
@Override
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
byte[] bytes =
CONVERTER.fromConnectData(record.topic(), record.valueSchema(), record.value());
out.collect(new String(bytes));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}

@ -67,6 +67,13 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
<scope>test</scope>
</dependency>
<!-- test dependencies on Flink -->
<dependency>

@ -18,32 +18,55 @@
package com.ververica.cdc.connectors.mysql;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import static org.junit.Assert.assertTrue;
/** Integration tests for {@link MySqlSource}. */
@Ignore
public class MySqlSourceITCase extends MySqlTestBase {
private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw");
private final UniqueDatabase fullTypesDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", "mysqluser", "mysqlpw");
@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testConsumingAllEvents() throws Exception {
inventoryDatabase.createAndInitialize();
SourceFunction<String> sourceFunction =
MySqlSource.<String>builder()
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.databaseList(
inventoryDatabase
.getDatabaseName()) // monitor all tables under inventory
// database
// monitor all tables under inventory database
.databaseList(inventoryDatabase.getDatabaseName())
.username(inventoryDatabase.getUsername())
.password(inventoryDatabase.getPassword())
.deserializer(new StringDebeziumDeserializationSchema())
@ -54,4 +77,113 @@ public class MySqlSourceITCase extends MySqlTestBase {
env.execute("Print MySQL Snapshot + Binlog");
}
@Test
public void testConsumingAllEventsWithJsonFormatIncludeSchema() throws Exception {
testConsumingAllEventsWithJsonFormat(true);
}
@Test
public void testConsumingAllTypesWithJsonFormatExcludeSchema() throws Exception {
testConsumingAllEventsWithJsonFormat(false);
}
private void testConsumingAllEventsWithJsonFormat(Boolean includeSchema) throws Exception {
fullTypesDatabase.createAndInitialize();
SourceFunction<String> sourceFunction =
MySqlSource.<String>builder()
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
// monitor all tables under column_type_test database
.databaseList(fullTypesDatabase.getDatabaseName())
.username(fullTypesDatabase.getUsername())
.password(fullTypesDatabase.getPassword())
.deserializer(new JsonDebeziumDeserializationSchema(includeSchema))
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build());
final String expectedFile =
includeSchema
? "file/debezium-data-schema-include.json"
: "file/debezium-data-schema-exclude.json";
final JSONObject expected =
JSONObject.parseObject(readLines(expectedFile), JSONObject.class);
JSONObject expectSnapshot = expected.getJSONObject("expected_snapshot");
DataStreamSource<String> source = env.addSource(sourceFunction);
tEnv.createTemporaryView("full_types", source);
TableResult result = tEnv.executeSql("SELECT * FROM full_types");
// check the snapshot result
CloseableIterator<Row> snapshot = result.collect();
waitForSnapshotStarted(snapshot);
assertTrue(
dataInJsonIsEquals(
fetchRows(snapshot, 1).get(0).toString(), expectSnapshot.toString()));
try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
}
// check the binlog result
CloseableIterator<Row> binlog = result.collect();
JSONObject expectBinlog = expected.getJSONObject("expected_binlog");
assertTrue(
dataInJsonIsEquals(
fetchRows(binlog, 1).get(0).toString(), expectBinlog.toString()));
result.getJobClient().get().cancel().get();
}
private static List<Object> fetchRows(Iterator<Row> iter, int size) {
List<Object> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
// ignore rowKind marker
rows.add(row.getField(0));
size--;
}
return rows;
}
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);
}
}
private static byte[] readLines(String resource) throws IOException, URISyntaxException {
Path path =
Paths.get(
Objects.requireNonNull(
MySqlSourceITCase.class
.getClassLoader()
.getResource(resource))
.toURI());
return Files.readAllBytes(path);
}
private static boolean dataInJsonIsEquals(String actual, String expect) {
JSONObject actualJsonObject = JSONObject.parseObject(actual);
JSONObject expectJsonObject = JSONObject.parseObject(expect);
if (expectJsonObject.getJSONObject("payload") != null
&& actualJsonObject.getJSONObject("payload") != null) {
expectJsonObject = expectJsonObject.getJSONObject("payload");
actualJsonObject = actualJsonObject.getJSONObject("payload");
}
return Objects.equals(
expectJsonObject.getJSONObject("after"),
actualJsonObject.getJSONObject("after"))
&& Objects.equals(
expectJsonObject.getJSONObject("before"),
actualJsonObject.getJSONObject("before"))
&& Objects.equals(expectJsonObject.get("op"), actualJsonObject.get("op"));
}
}

@ -0,0 +1,83 @@
{
"expected_snapshot": {
"before": null,
"after": {
"id": 1,
"tiny_c": 127,
"tiny_un_c": 255,
"small_c": 32767,
"small_un_c": 65535,
"int_c": 2147483647,
"int_un_c": 4294967295,
"int11_c": 2147483647,
"big_c": 9223372036854775807,
"varchar_c": "Hello World",
"char_c": "abc",
"float_c": 123.10199737548828,
"double_c": 404.4443,
"decimal_c": "EtaH",
"numeric_c": "AVo=",
"boolean_c": 1,
"date_c": 18460,
"time_c": 64822000000,
"datetime3_c": 1595008822123,
"datetime6_c": 1595008822123456,
"timestamp_c": "2020-07-17T18:00:22Z",
"file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA=="
},
"op": "r",
"transaction": null
},
"expected_binlog": {
"before": {
"id": 1,
"tiny_c": 127,
"tiny_un_c": 255,
"small_c": 32767,
"small_un_c": 65535,
"int_c": 2147483647,
"int_un_c": 4294967295,
"int11_c": 2147483647,
"big_c": 9223372036854775807,
"varchar_c": "Hello World",
"char_c": "abc",
"float_c": 123.10199737548828,
"double_c": 404.4443,
"decimal_c": "EtaH",
"numeric_c": "AVo=",
"boolean_c": 1,
"date_c": 18460,
"time_c": 64822000000,
"datetime3_c": 1595008822123,
"datetime6_c": 1595008822123456,
"timestamp_c": "2020-07-17T18:00:22Z",
"file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA=="
},
"after": {
"id": 1,
"tiny_c": 127,
"tiny_un_c": 255,
"small_c": 32767,
"small_un_c": 65535,
"int_c": 2147483647,
"int_un_c": 4294967295,
"int11_c": 2147483647,
"big_c": 9223372036854775807,
"varchar_c": "Hello World",
"char_c": "abc",
"float_c": 123.10199737548828,
"double_c": 404.4443,
"decimal_c": "EtaH",
"numeric_c": "AVo=",
"boolean_c": 1,
"date_c": 18460,
"time_c": 64822000000,
"datetime3_c": 1595008822123,
"datetime6_c": 1595008822123456,
"timestamp_c": "2020-07-17T18:33:22Z",
"file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA=="
},
"op": "u",
"transaction": null
}
}

@ -0,0 +1,943 @@
{
"expected_snapshot": {
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "int16",
"optional": true,
"field": "tiny_c"
},
{
"type": "int16",
"optional": true,
"field": "tiny_un_c"
},
{
"type": "int16",
"optional": true,
"field": "small_c"
},
{
"type": "int32",
"optional": true,
"field": "small_un_c"
},
{
"type": "int32",
"optional": true,
"field": "int_c"
},
{
"type": "int64",
"optional": true,
"field": "int_un_c"
},
{
"type": "int32",
"optional": true,
"field": "int11_c"
},
{
"type": "int64",
"optional": true,
"field": "big_c"
},
{
"type": "string",
"optional": true,
"field": "varchar_c"
},
{
"type": "string",
"optional": true,
"field": "char_c"
},
{
"type": "double",
"optional": true,
"field": "float_c"
},
{
"type": "double",
"optional": true,
"field": "double_c"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "4",
"connect.decimal.precision": "8"
},
"field": "decimal_c"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0",
"connect.decimal.precision": "6"
},
"field": "numeric_c"
},
{
"type": "int16",
"optional": true,
"field": "boolean_c"
},
{
"type": "int32",
"optional": true,
"name": "io.debezium.time.Date",
"version": 1,
"field": "date_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTime",
"version": 1,
"field": "time_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "datetime3_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "datetime6_c"
},
{
"type": "string",
"optional": false,
"name": "io.debezium.time.ZonedTimestamp",
"version": 1,
"default": "1970-01-01T00:00:00Z",
"field": "timestamp_c"
},
{
"type": "bytes",
"optional": true,
"field": "file_uuid"
}
],
"optional": true,
"name": "mysql_binlog_source.column_type_test_2jid2b.full_types.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "int16",
"optional": true,
"field": "tiny_c"
},
{
"type": "int16",
"optional": true,
"field": "tiny_un_c"
},
{
"type": "int16",
"optional": true,
"field": "small_c"
},
{
"type": "int32",
"optional": true,
"field": "small_un_c"
},
{
"type": "int32",
"optional": true,
"field": "int_c"
},
{
"type": "int64",
"optional": true,
"field": "int_un_c"
},
{
"type": "int32",
"optional": true,
"field": "int11_c"
},
{
"type": "int64",
"optional": true,
"field": "big_c"
},
{
"type": "string",
"optional": true,
"field": "varchar_c"
},
{
"type": "string",
"optional": true,
"field": "char_c"
},
{
"type": "double",
"optional": true,
"field": "float_c"
},
{
"type": "double",
"optional": true,
"field": "double_c"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "4",
"connect.decimal.precision": "8"
},
"field": "decimal_c"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0",
"connect.decimal.precision": "6"
},
"field": "numeric_c"
},
{
"type": "int16",
"optional": true,
"field": "boolean_c"
},
{
"type": "int32",
"optional": true,
"name": "io.debezium.time.Date",
"version": 1,
"field": "date_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTime",
"version": 1,
"field": "time_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "datetime3_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "datetime6_c"
},
{
"type": "string",
"optional": false,
"name": "io.debezium.time.ZonedTimestamp",
"version": 1,
"default": "1970-01-01T00:00:00Z",
"field": "timestamp_c"
},
{
"type": "bytes",
"optional": true,
"field": "file_uuid"
}
],
"optional": true,
"name": "mysql_binlog_source.column_type_test_2jid2b.full_types.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "mysql_binlog_source.column_type_test_2jid2b.full_types.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1,
"tiny_c": 127,
"tiny_un_c": 255,
"small_c": 32767,
"small_un_c": 65535,
"int_c": 2147483647,
"int_un_c": 4294967295,
"int11_c": 2147483647,
"big_c": 9223372036854775807,
"varchar_c": "Hello World",
"char_c": "abc",
"float_c": 123.10199737548828,
"double_c": 404.4443,
"decimal_c": "EtaH",
"numeric_c": "AVo=",
"boolean_c": 1,
"date_c": 18460,
"time_c": 64822000000,
"datetime3_c": 1595008822123,
"datetime6_c": 1595008822123456,
"timestamp_c": "2020-07-17T18:00:22Z",
"file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA=="
},
"source": {
"version": "1.5.2.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1630315344066,
"snapshot": "last",
"db": "column_type_test_2jid2b",
"sequence": null,
"table": "full_types",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 1532,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1630315344085,
"transaction": null
}
},
"expected_binlog": {
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "int16",
"optional": true,
"field": "tiny_c"
},
{
"type": "int16",
"optional": true,
"field": "tiny_un_c"
},
{
"type": "int16",
"optional": true,
"field": "small_c"
},
{
"type": "int32",
"optional": true,
"field": "small_un_c"
},
{
"type": "int32",
"optional": true,
"field": "int_c"
},
{
"type": "int64",
"optional": true,
"field": "int_un_c"
},
{
"type": "int32",
"optional": true,
"field": "int11_c"
},
{
"type": "int64",
"optional": true,
"field": "big_c"
},
{
"type": "string",
"optional": true,
"field": "varchar_c"
},
{
"type": "string",
"optional": true,
"field": "char_c"
},
{
"type": "double",
"optional": true,
"field": "float_c"
},
{
"type": "double",
"optional": true,
"field": "double_c"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "4",
"connect.decimal.precision": "8"
},
"field": "decimal_c"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0",
"connect.decimal.precision": "6"
},
"field": "numeric_c"
},
{
"type": "int16",
"optional": true,
"field": "boolean_c"
},
{
"type": "int32",
"optional": true,
"name": "io.debezium.time.Date",
"version": 1,
"field": "date_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTime",
"version": 1,
"field": "time_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "datetime3_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "datetime6_c"
},
{
"type": "string",
"optional": false,
"name": "io.debezium.time.ZonedTimestamp",
"version": 1,
"default": "1970-01-01T00:00:00Z",
"field": "timestamp_c"
},
{
"type": "bytes",
"optional": true,
"field": "file_uuid"
}
],
"optional": true,
"name": "mysql_binlog_source.column_type_test_13p6nkc.full_types.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "int16",
"optional": true,
"field": "tiny_c"
},
{
"type": "int16",
"optional": true,
"field": "tiny_un_c"
},
{
"type": "int16",
"optional": true,
"field": "small_c"
},
{
"type": "int32",
"optional": true,
"field": "small_un_c"
},
{
"type": "int32",
"optional": true,
"field": "int_c"
},
{
"type": "int64",
"optional": true,
"field": "int_un_c"
},
{
"type": "int32",
"optional": true,
"field": "int11_c"
},
{
"type": "int64",
"optional": true,
"field": "big_c"
},
{
"type": "string",
"optional": true,
"field": "varchar_c"
},
{
"type": "string",
"optional": true,
"field": "char_c"
},
{
"type": "double",
"optional": true,
"field": "float_c"
},
{
"type": "double",
"optional": true,
"field": "double_c"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "4",
"connect.decimal.precision": "8"
},
"field": "decimal_c"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "0",
"connect.decimal.precision": "6"
},
"field": "numeric_c"
},
{
"type": "int16",
"optional": true,
"field": "boolean_c"
},
{
"type": "int32",
"optional": true,
"name": "io.debezium.time.Date",
"version": 1,
"field": "date_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTime",
"version": 1,
"field": "time_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "datetime3_c"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "datetime6_c"
},
{
"type": "string",
"optional": false,
"name": "io.debezium.time.ZonedTimestamp",
"version": 1,
"default": "1970-01-01T00:00:00Z",
"field": "timestamp_c"
},
{
"type": "bytes",
"optional": true,
"field": "file_uuid"
}
],
"optional": true,
"name": "mysql_binlog_source.column_type_test_13p6nkc.full_types.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "mysql_binlog_source.column_type_test_13p6nkc.full_types.Envelope"
},
"payload": {
"before": {
"id": 1,
"tiny_c": 127,
"tiny_un_c": 255,
"small_c": 32767,
"small_un_c": 65535,
"int_c": 2147483647,
"int_un_c": 4294967295,
"int11_c": 2147483647,
"big_c": 9223372036854775807,
"varchar_c": "Hello World",
"char_c": "abc",
"float_c": 123.10199737548828,
"double_c": 404.4443,
"decimal_c": "EtaH",
"numeric_c": "AVo=",
"boolean_c": 1,
"date_c": 18460,
"time_c": 64822000000,
"datetime3_c": 1595008822123,
"datetime6_c": 1595008822123456,
"timestamp_c": "2020-07-17T18:00:22Z",
"file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA=="
},
"after": {
"id": 1,
"tiny_c": 127,
"tiny_un_c": 255,
"small_c": 32767,
"small_un_c": 65535,
"int_c": 2147483647,
"int_un_c": 4294967295,
"int11_c": 2147483647,
"big_c": 9223372036854775807,
"varchar_c": "Hello World",
"char_c": "abc",
"float_c": 123.10199737548828,
"double_c": 404.4443,
"decimal_c": "EtaH",
"numeric_c": "AVo=",
"boolean_c": 1,
"date_c": 18460,
"time_c": 64822000000,
"datetime3_c": 1595008822123,
"datetime6_c": 1595008822123456,
"timestamp_c": "2020-07-17T18:33:22Z",
"file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA=="
},
"source": {
"version": "1.5.2.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1630324170000,
"snapshot": "false",
"db": "column_type_test_13p6nkc",
"sequence": null,
"table": "full_types",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 1816,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1630324170753,
"transaction": null
}
}
}

@ -185,6 +185,7 @@ under the License.
<exclude>docs/_templates/version.html</exclude>
<!-- Tests -->
<exclude>**/*.txt</exclude>
<exclude>flink-connector-mysql-cdc/src/test/resources/file/*.json</exclude>
</excludes>
</configuration>
</plugin>

Loading…
Cancel
Save