diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/JsonDebeziumDeserializationSchema.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/JsonDebeziumDeserializationSchema.java new file mode 100644 index 000000000..de74b8caa --- /dev/null +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/JsonDebeziumDeserializationSchema.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.debezium; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; + +import java.util.HashMap; + +/** + * A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the + * received {@link SourceRecord} to JSON String. + */ +public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema { + + private static final long serialVersionUID = 1L; + private static final JsonConverter CONVERTER = new JsonConverter(); + + public JsonDebeziumDeserializationSchema() { + this(false); + } + + public JsonDebeziumDeserializationSchema(boolean includeSchema) { + final HashMap configs = new HashMap<>(); + configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()); + configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema); + CONVERTER.configure(configs); + } + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + byte[] bytes = + CONVERTER.fromConnectData(record.topic(), record.valueSchema(), record.value()); + out.collect(new String(bytes)); + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } +} diff --git a/flink-connector-mysql-cdc/pom.xml b/flink-connector-mysql-cdc/pom.xml index 3cdc51ece..776b4eb9a 100644 --- a/flink-connector-mysql-cdc/pom.xml +++ b/flink-connector-mysql-cdc/pom.xml @@ -67,6 +67,13 @@ under the License. test + + com.alibaba + fastjson + 1.2.78 + test + + diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceITCase.java index 18c05d6e1..c3c37886b 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceITCase.java @@ -18,32 +18,55 @@ package com.ververica.cdc.connectors.mysql; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.junit.Ignore; import org.junit.Test; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +import static org.junit.Assert.assertTrue; + /** Integration tests for {@link MySqlSource}. */ -@Ignore public class MySqlSourceITCase extends MySqlTestBase { private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw"); + private final UniqueDatabase fullTypesDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", "mysqluser", "mysqlpw"); + @Test + @Ignore("Test ignored because it won't stop and is used for manual test") public void testConsumingAllEvents() throws Exception { inventoryDatabase.createAndInitialize(); SourceFunction sourceFunction = MySqlSource.builder() .hostname(MYSQL_CONTAINER.getHost()) .port(MYSQL_CONTAINER.getDatabasePort()) - .databaseList( - inventoryDatabase - .getDatabaseName()) // monitor all tables under inventory - // database + // monitor all tables under inventory database + .databaseList(inventoryDatabase.getDatabaseName()) .username(inventoryDatabase.getUsername()) .password(inventoryDatabase.getPassword()) .deserializer(new StringDebeziumDeserializationSchema()) @@ -54,4 +77,113 @@ public class MySqlSourceITCase extends MySqlTestBase { env.execute("Print MySQL Snapshot + Binlog"); } + + @Test + public void testConsumingAllEventsWithJsonFormatIncludeSchema() throws Exception { + testConsumingAllEventsWithJsonFormat(true); + } + + @Test + public void testConsumingAllTypesWithJsonFormatExcludeSchema() throws Exception { + testConsumingAllEventsWithJsonFormat(false); + } + + private void testConsumingAllEventsWithJsonFormat(Boolean includeSchema) throws Exception { + fullTypesDatabase.createAndInitialize(); + SourceFunction sourceFunction = + MySqlSource.builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + // monitor all tables under column_type_test database + .databaseList(fullTypesDatabase.getDatabaseName()) + .username(fullTypesDatabase.getUsername()) + .password(fullTypesDatabase.getPassword()) + .deserializer(new JsonDebeziumDeserializationSchema(includeSchema)) + .build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(1000); + StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, + EnvironmentSettings.newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build()); + final String expectedFile = + includeSchema + ? "file/debezium-data-schema-include.json" + : "file/debezium-data-schema-exclude.json"; + final JSONObject expected = + JSONObject.parseObject(readLines(expectedFile), JSONObject.class); + JSONObject expectSnapshot = expected.getJSONObject("expected_snapshot"); + + DataStreamSource source = env.addSource(sourceFunction); + tEnv.createTemporaryView("full_types", source); + TableResult result = tEnv.executeSql("SELECT * FROM full_types"); + + // check the snapshot result + CloseableIterator snapshot = result.collect(); + waitForSnapshotStarted(snapshot); + assertTrue( + dataInJsonIsEquals( + fetchRows(snapshot, 1).get(0).toString(), expectSnapshot.toString())); + try (Connection connection = fullTypesDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;"); + } + + // check the binlog result + CloseableIterator binlog = result.collect(); + JSONObject expectBinlog = expected.getJSONObject("expected_binlog"); + assertTrue( + dataInJsonIsEquals( + fetchRows(binlog, 1).get(0).toString(), expectBinlog.toString())); + result.getJobClient().get().cancel().get(); + } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + // ignore rowKind marker + rows.add(row.getField(0)); + size--; + } + return rows; + } + + private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception { + while (!iterator.hasNext()) { + Thread.sleep(100); + } + } + + private static byte[] readLines(String resource) throws IOException, URISyntaxException { + Path path = + Paths.get( + Objects.requireNonNull( + MySqlSourceITCase.class + .getClassLoader() + .getResource(resource)) + .toURI()); + return Files.readAllBytes(path); + } + + private static boolean dataInJsonIsEquals(String actual, String expect) { + JSONObject actualJsonObject = JSONObject.parseObject(actual); + JSONObject expectJsonObject = JSONObject.parseObject(expect); + if (expectJsonObject.getJSONObject("payload") != null + && actualJsonObject.getJSONObject("payload") != null) { + expectJsonObject = expectJsonObject.getJSONObject("payload"); + actualJsonObject = actualJsonObject.getJSONObject("payload"); + } + return Objects.equals( + expectJsonObject.getJSONObject("after"), + actualJsonObject.getJSONObject("after")) + && Objects.equals( + expectJsonObject.getJSONObject("before"), + actualJsonObject.getJSONObject("before")) + && Objects.equals(expectJsonObject.get("op"), actualJsonObject.get("op")); + } } diff --git a/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude.json b/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude.json new file mode 100644 index 000000000..6dfa12132 --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-exclude.json @@ -0,0 +1,83 @@ +{ + "expected_snapshot": { + "before": null, + "after": { + "id": 1, + "tiny_c": 127, + "tiny_un_c": 255, + "small_c": 32767, + "small_un_c": 65535, + "int_c": 2147483647, + "int_un_c": 4294967295, + "int11_c": 2147483647, + "big_c": 9223372036854775807, + "varchar_c": "Hello World", + "char_c": "abc", + "float_c": 123.10199737548828, + "double_c": 404.4443, + "decimal_c": "EtaH", + "numeric_c": "AVo=", + "boolean_c": 1, + "date_c": 18460, + "time_c": 64822000000, + "datetime3_c": 1595008822123, + "datetime6_c": 1595008822123456, + "timestamp_c": "2020-07-17T18:00:22Z", + "file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA==" + }, + "op": "r", + "transaction": null + }, + "expected_binlog": { + "before": { + "id": 1, + "tiny_c": 127, + "tiny_un_c": 255, + "small_c": 32767, + "small_un_c": 65535, + "int_c": 2147483647, + "int_un_c": 4294967295, + "int11_c": 2147483647, + "big_c": 9223372036854775807, + "varchar_c": "Hello World", + "char_c": "abc", + "float_c": 123.10199737548828, + "double_c": 404.4443, + "decimal_c": "EtaH", + "numeric_c": "AVo=", + "boolean_c": 1, + "date_c": 18460, + "time_c": 64822000000, + "datetime3_c": 1595008822123, + "datetime6_c": 1595008822123456, + "timestamp_c": "2020-07-17T18:00:22Z", + "file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA==" + }, + "after": { + "id": 1, + "tiny_c": 127, + "tiny_un_c": 255, + "small_c": 32767, + "small_un_c": 65535, + "int_c": 2147483647, + "int_un_c": 4294967295, + "int11_c": 2147483647, + "big_c": 9223372036854775807, + "varchar_c": "Hello World", + "char_c": "abc", + "float_c": 123.10199737548828, + "double_c": 404.4443, + "decimal_c": "EtaH", + "numeric_c": "AVo=", + "boolean_c": 1, + "date_c": 18460, + "time_c": 64822000000, + "datetime3_c": 1595008822123, + "datetime6_c": 1595008822123456, + "timestamp_c": "2020-07-17T18:33:22Z", + "file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA==" + }, + "op": "u", + "transaction": null + } +} \ No newline at end of file diff --git a/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-include.json b/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-include.json new file mode 100644 index 000000000..b58e34b1e --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/resources/file/debezium-data-schema-include.json @@ -0,0 +1,943 @@ +{ + "expected_snapshot": { + "schema": { + "type": "struct", + "fields": [ + { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "id" + }, + { + "type": "int16", + "optional": true, + "field": "tiny_c" + }, + { + "type": "int16", + "optional": true, + "field": "tiny_un_c" + }, + { + "type": "int16", + "optional": true, + "field": "small_c" + }, + { + "type": "int32", + "optional": true, + "field": "small_un_c" + }, + { + "type": "int32", + "optional": true, + "field": "int_c" + }, + { + "type": "int64", + "optional": true, + "field": "int_un_c" + }, + { + "type": "int32", + "optional": true, + "field": "int11_c" + }, + { + "type": "int64", + "optional": true, + "field": "big_c" + }, + { + "type": "string", + "optional": true, + "field": "varchar_c" + }, + { + "type": "string", + "optional": true, + "field": "char_c" + }, + { + "type": "double", + "optional": true, + "field": "float_c" + }, + { + "type": "double", + "optional": true, + "field": "double_c" + }, + { + "type": "bytes", + "optional": true, + "name": "org.apache.kafka.connect.data.Decimal", + "version": 1, + "parameters": { + "scale": "4", + "connect.decimal.precision": "8" + }, + "field": "decimal_c" + }, + { + "type": "bytes", + "optional": true, + "name": "org.apache.kafka.connect.data.Decimal", + "version": 1, + "parameters": { + "scale": "0", + "connect.decimal.precision": "6" + }, + "field": "numeric_c" + }, + { + "type": "int16", + "optional": true, + "field": "boolean_c" + }, + { + "type": "int32", + "optional": true, + "name": "io.debezium.time.Date", + "version": 1, + "field": "date_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.MicroTime", + "version": 1, + "field": "time_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.Timestamp", + "version": 1, + "field": "datetime3_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.MicroTimestamp", + "version": 1, + "field": "datetime6_c" + }, + { + "type": "string", + "optional": false, + "name": "io.debezium.time.ZonedTimestamp", + "version": 1, + "default": "1970-01-01T00:00:00Z", + "field": "timestamp_c" + }, + { + "type": "bytes", + "optional": true, + "field": "file_uuid" + } + ], + "optional": true, + "name": "mysql_binlog_source.column_type_test_2jid2b.full_types.Value", + "field": "before" + }, + { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "id" + }, + { + "type": "int16", + "optional": true, + "field": "tiny_c" + }, + { + "type": "int16", + "optional": true, + "field": "tiny_un_c" + }, + { + "type": "int16", + "optional": true, + "field": "small_c" + }, + { + "type": "int32", + "optional": true, + "field": "small_un_c" + }, + { + "type": "int32", + "optional": true, + "field": "int_c" + }, + { + "type": "int64", + "optional": true, + "field": "int_un_c" + }, + { + "type": "int32", + "optional": true, + "field": "int11_c" + }, + { + "type": "int64", + "optional": true, + "field": "big_c" + }, + { + "type": "string", + "optional": true, + "field": "varchar_c" + }, + { + "type": "string", + "optional": true, + "field": "char_c" + }, + { + "type": "double", + "optional": true, + "field": "float_c" + }, + { + "type": "double", + "optional": true, + "field": "double_c" + }, + { + "type": "bytes", + "optional": true, + "name": "org.apache.kafka.connect.data.Decimal", + "version": 1, + "parameters": { + "scale": "4", + "connect.decimal.precision": "8" + }, + "field": "decimal_c" + }, + { + "type": "bytes", + "optional": true, + "name": "org.apache.kafka.connect.data.Decimal", + "version": 1, + "parameters": { + "scale": "0", + "connect.decimal.precision": "6" + }, + "field": "numeric_c" + }, + { + "type": "int16", + "optional": true, + "field": "boolean_c" + }, + { + "type": "int32", + "optional": true, + "name": "io.debezium.time.Date", + "version": 1, + "field": "date_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.MicroTime", + "version": 1, + "field": "time_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.Timestamp", + "version": 1, + "field": "datetime3_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.MicroTimestamp", + "version": 1, + "field": "datetime6_c" + }, + { + "type": "string", + "optional": false, + "name": "io.debezium.time.ZonedTimestamp", + "version": 1, + "default": "1970-01-01T00:00:00Z", + "field": "timestamp_c" + }, + { + "type": "bytes", + "optional": true, + "field": "file_uuid" + } + ], + "optional": true, + "name": "mysql_binlog_source.column_type_test_2jid2b.full_types.Value", + "field": "after" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "version" + }, + { + "type": "string", + "optional": false, + "field": "connector" + }, + { + "type": "string", + "optional": false, + "field": "name" + }, + { + "type": "int64", + "optional": false, + "field": "ts_ms" + }, + { + "type": "string", + "optional": true, + "name": "io.debezium.data.Enum", + "version": 1, + "parameters": { + "allowed": "true,last,false" + }, + "default": "false", + "field": "snapshot" + }, + { + "type": "string", + "optional": false, + "field": "db" + }, + { + "type": "string", + "optional": true, + "field": "sequence" + }, + { + "type": "string", + "optional": true, + "field": "table" + }, + { + "type": "int64", + "optional": false, + "field": "server_id" + }, + { + "type": "string", + "optional": true, + "field": "gtid" + }, + { + "type": "string", + "optional": false, + "field": "file" + }, + { + "type": "int64", + "optional": false, + "field": "pos" + }, + { + "type": "int32", + "optional": false, + "field": "row" + }, + { + "type": "int64", + "optional": true, + "field": "thread" + }, + { + "type": "string", + "optional": true, + "field": "query" + } + ], + "optional": false, + "name": "io.debezium.connector.mysql.Source", + "field": "source" + }, + { + "type": "string", + "optional": false, + "field": "op" + }, + { + "type": "int64", + "optional": true, + "field": "ts_ms" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "id" + }, + { + "type": "int64", + "optional": false, + "field": "total_order" + }, + { + "type": "int64", + "optional": false, + "field": "data_collection_order" + } + ], + "optional": true, + "field": "transaction" + } + ], + "optional": false, + "name": "mysql_binlog_source.column_type_test_2jid2b.full_types.Envelope" + }, + "payload": { + "before": null, + "after": { + "id": 1, + "tiny_c": 127, + "tiny_un_c": 255, + "small_c": 32767, + "small_un_c": 65535, + "int_c": 2147483647, + "int_un_c": 4294967295, + "int11_c": 2147483647, + "big_c": 9223372036854775807, + "varchar_c": "Hello World", + "char_c": "abc", + "float_c": 123.10199737548828, + "double_c": 404.4443, + "decimal_c": "EtaH", + "numeric_c": "AVo=", + "boolean_c": 1, + "date_c": 18460, + "time_c": 64822000000, + "datetime3_c": 1595008822123, + "datetime6_c": 1595008822123456, + "timestamp_c": "2020-07-17T18:00:22Z", + "file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA==" + }, + "source": { + "version": "1.5.2.Final", + "connector": "mysql", + "name": "mysql_binlog_source", + "ts_ms": 1630315344066, + "snapshot": "last", + "db": "column_type_test_2jid2b", + "sequence": null, + "table": "full_types", + "server_id": 0, + "gtid": null, + "file": "mysql-bin.000003", + "pos": 1532, + "row": 0, + "thread": null, + "query": null + }, + "op": "r", + "ts_ms": 1630315344085, + "transaction": null + } + }, + "expected_binlog": { + "schema": { + "type": "struct", + "fields": [ + { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "id" + }, + { + "type": "int16", + "optional": true, + "field": "tiny_c" + }, + { + "type": "int16", + "optional": true, + "field": "tiny_un_c" + }, + { + "type": "int16", + "optional": true, + "field": "small_c" + }, + { + "type": "int32", + "optional": true, + "field": "small_un_c" + }, + { + "type": "int32", + "optional": true, + "field": "int_c" + }, + { + "type": "int64", + "optional": true, + "field": "int_un_c" + }, + { + "type": "int32", + "optional": true, + "field": "int11_c" + }, + { + "type": "int64", + "optional": true, + "field": "big_c" + }, + { + "type": "string", + "optional": true, + "field": "varchar_c" + }, + { + "type": "string", + "optional": true, + "field": "char_c" + }, + { + "type": "double", + "optional": true, + "field": "float_c" + }, + { + "type": "double", + "optional": true, + "field": "double_c" + }, + { + "type": "bytes", + "optional": true, + "name": "org.apache.kafka.connect.data.Decimal", + "version": 1, + "parameters": { + "scale": "4", + "connect.decimal.precision": "8" + }, + "field": "decimal_c" + }, + { + "type": "bytes", + "optional": true, + "name": "org.apache.kafka.connect.data.Decimal", + "version": 1, + "parameters": { + "scale": "0", + "connect.decimal.precision": "6" + }, + "field": "numeric_c" + }, + { + "type": "int16", + "optional": true, + "field": "boolean_c" + }, + { + "type": "int32", + "optional": true, + "name": "io.debezium.time.Date", + "version": 1, + "field": "date_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.MicroTime", + "version": 1, + "field": "time_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.Timestamp", + "version": 1, + "field": "datetime3_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.MicroTimestamp", + "version": 1, + "field": "datetime6_c" + }, + { + "type": "string", + "optional": false, + "name": "io.debezium.time.ZonedTimestamp", + "version": 1, + "default": "1970-01-01T00:00:00Z", + "field": "timestamp_c" + }, + { + "type": "bytes", + "optional": true, + "field": "file_uuid" + } + ], + "optional": true, + "name": "mysql_binlog_source.column_type_test_13p6nkc.full_types.Value", + "field": "before" + }, + { + "type": "struct", + "fields": [ + { + "type": "int32", + "optional": false, + "field": "id" + }, + { + "type": "int16", + "optional": true, + "field": "tiny_c" + }, + { + "type": "int16", + "optional": true, + "field": "tiny_un_c" + }, + { + "type": "int16", + "optional": true, + "field": "small_c" + }, + { + "type": "int32", + "optional": true, + "field": "small_un_c" + }, + { + "type": "int32", + "optional": true, + "field": "int_c" + }, + { + "type": "int64", + "optional": true, + "field": "int_un_c" + }, + { + "type": "int32", + "optional": true, + "field": "int11_c" + }, + { + "type": "int64", + "optional": true, + "field": "big_c" + }, + { + "type": "string", + "optional": true, + "field": "varchar_c" + }, + { + "type": "string", + "optional": true, + "field": "char_c" + }, + { + "type": "double", + "optional": true, + "field": "float_c" + }, + { + "type": "double", + "optional": true, + "field": "double_c" + }, + { + "type": "bytes", + "optional": true, + "name": "org.apache.kafka.connect.data.Decimal", + "version": 1, + "parameters": { + "scale": "4", + "connect.decimal.precision": "8" + }, + "field": "decimal_c" + }, + { + "type": "bytes", + "optional": true, + "name": "org.apache.kafka.connect.data.Decimal", + "version": 1, + "parameters": { + "scale": "0", + "connect.decimal.precision": "6" + }, + "field": "numeric_c" + }, + { + "type": "int16", + "optional": true, + "field": "boolean_c" + }, + { + "type": "int32", + "optional": true, + "name": "io.debezium.time.Date", + "version": 1, + "field": "date_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.MicroTime", + "version": 1, + "field": "time_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.Timestamp", + "version": 1, + "field": "datetime3_c" + }, + { + "type": "int64", + "optional": true, + "name": "io.debezium.time.MicroTimestamp", + "version": 1, + "field": "datetime6_c" + }, + { + "type": "string", + "optional": false, + "name": "io.debezium.time.ZonedTimestamp", + "version": 1, + "default": "1970-01-01T00:00:00Z", + "field": "timestamp_c" + }, + { + "type": "bytes", + "optional": true, + "field": "file_uuid" + } + ], + "optional": true, + "name": "mysql_binlog_source.column_type_test_13p6nkc.full_types.Value", + "field": "after" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "version" + }, + { + "type": "string", + "optional": false, + "field": "connector" + }, + { + "type": "string", + "optional": false, + "field": "name" + }, + { + "type": "int64", + "optional": false, + "field": "ts_ms" + }, + { + "type": "string", + "optional": true, + "name": "io.debezium.data.Enum", + "version": 1, + "parameters": { + "allowed": "true,last,false" + }, + "default": "false", + "field": "snapshot" + }, + { + "type": "string", + "optional": false, + "field": "db" + }, + { + "type": "string", + "optional": true, + "field": "sequence" + }, + { + "type": "string", + "optional": true, + "field": "table" + }, + { + "type": "int64", + "optional": false, + "field": "server_id" + }, + { + "type": "string", + "optional": true, + "field": "gtid" + }, + { + "type": "string", + "optional": false, + "field": "file" + }, + { + "type": "int64", + "optional": false, + "field": "pos" + }, + { + "type": "int32", + "optional": false, + "field": "row" + }, + { + "type": "int64", + "optional": true, + "field": "thread" + }, + { + "type": "string", + "optional": true, + "field": "query" + } + ], + "optional": false, + "name": "io.debezium.connector.mysql.Source", + "field": "source" + }, + { + "type": "string", + "optional": false, + "field": "op" + }, + { + "type": "int64", + "optional": true, + "field": "ts_ms" + }, + { + "type": "struct", + "fields": [ + { + "type": "string", + "optional": false, + "field": "id" + }, + { + "type": "int64", + "optional": false, + "field": "total_order" + }, + { + "type": "int64", + "optional": false, + "field": "data_collection_order" + } + ], + "optional": true, + "field": "transaction" + } + ], + "optional": false, + "name": "mysql_binlog_source.column_type_test_13p6nkc.full_types.Envelope" + }, + "payload": { + "before": { + "id": 1, + "tiny_c": 127, + "tiny_un_c": 255, + "small_c": 32767, + "small_un_c": 65535, + "int_c": 2147483647, + "int_un_c": 4294967295, + "int11_c": 2147483647, + "big_c": 9223372036854775807, + "varchar_c": "Hello World", + "char_c": "abc", + "float_c": 123.10199737548828, + "double_c": 404.4443, + "decimal_c": "EtaH", + "numeric_c": "AVo=", + "boolean_c": 1, + "date_c": 18460, + "time_c": 64822000000, + "datetime3_c": 1595008822123, + "datetime6_c": 1595008822123456, + "timestamp_c": "2020-07-17T18:00:22Z", + "file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA==" + }, + "after": { + "id": 1, + "tiny_c": 127, + "tiny_un_c": 255, + "small_c": 32767, + "small_un_c": 65535, + "int_c": 2147483647, + "int_un_c": 4294967295, + "int11_c": 2147483647, + "big_c": 9223372036854775807, + "varchar_c": "Hello World", + "char_c": "abc", + "float_c": 123.10199737548828, + "double_c": 404.4443, + "decimal_c": "EtaH", + "numeric_c": "AVo=", + "boolean_c": 1, + "date_c": 18460, + "time_c": 64822000000, + "datetime3_c": 1595008822123, + "datetime6_c": 1595008822123456, + "timestamp_c": "2020-07-17T18:33:22Z", + "file_uuid": "ZRrtCDkPSJOy8TaSPnt0AA==" + }, + "source": { + "version": "1.5.2.Final", + "connector": "mysql", + "name": "mysql_binlog_source", + "ts_ms": 1630324170000, + "snapshot": "false", + "db": "column_type_test_13p6nkc", + "sequence": null, + "table": "full_types", + "server_id": 223344, + "gtid": null, + "file": "mysql-bin.000003", + "pos": 1816, + "row": 0, + "thread": null, + "query": null + }, + "op": "u", + "ts_ms": 1630324170753, + "transaction": null + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index fcfcf5113..36fec5e58 100644 --- a/pom.xml +++ b/pom.xml @@ -185,6 +185,7 @@ under the License. docs/_templates/version.html **/*.txt + flink-connector-mysql-cdc/src/test/resources/file/*.json