From 174282d3ef0d57488d6fb13ba684e2318bec2079 Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Mon, 4 Dec 2023 23:04:41 +0800 Subject: [PATCH] [cdc-pipeline-connector][mysql] Support more mysql types --- .../cdc/common/event/EventDeserializer.java | 4 +- .../mysql/factory/MySqlDataSourceFactory.java | 39 +- .../mysql/source/MySqlDataSource.java | 6 + .../mysql/source/MySqlDataSourceOptions.java | 3 +- .../mysql/source/MySqlEventDeserializer.java | 53 +- .../source/MySqlSchemaDataTypeInference.java | 43 + .../reader/MySqlPipelineRecordEmitter.java | 4 +- .../mysql/utils/MySqlSchemaUtils.java | 3 +- .../mysql/utils/MySqlTypeUtils.java} | 58 +- .../event/ConnectSchemaTypeInference.java | 67 - .../DebeziumEventDeserializationSchema.java | 478 +++---- .../DebeziumSchemaDataTypeInference.java | 198 +++ .../event/SchemaDataTypeInference.java | 35 + .../event/SourceRecordEventDeserializer.java | 4 +- .../mysql/schema/MySqlTypeUtils.java | 56 +- .../connectors/mysql/source/MySqlSource.java | 3 +- .../connector/mysql/MySqlValueConverters.java | 1120 +++++++++++++++++ 17 files changed, 1773 insertions(+), 401 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSchemaDataTypeInference.java rename flink-cdc-connect/{flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlCdcCommonTypeUtils.java => flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlTypeUtils.java} (78%) delete mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/ConnectSchemaTypeInference.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumSchemaDataTypeInference.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SchemaDataTypeInference.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/EventDeserializer.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/EventDeserializer.java index 1673b6d31..7df1d9d1f 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/EventDeserializer.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/EventDeserializer.java @@ -16,13 +16,13 @@ package com.ververica.cdc.common.event; -import com.ververica.cdc.common.annotation.Public; +import com.ververica.cdc.common.annotation.PublicEvolving; import java.io.Serializable; import java.util.List; /** Deserializer to deserialize given record to {@link Event}. */ -@Public +@PublicEvolving public interface EventDeserializer extends Serializable { /** Deserialize given record to {@link Event}s. */ diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index ad8332220..dfd7af012 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -85,7 +85,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { @Override public DataSource createDataSource(Context context) { - final Configuration config = context.getConfiguration(); + final Configuration config = context.getFactoryConfiguration(); String hostname = config.get(HOSTNAME); int port = config.get(PORT); @@ -103,21 +103,17 @@ public class MySqlDataSourceFactory implements DataSourceFactory { int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE); - double distributionFactorUpper = - config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); - double distributionFactorLower = - config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + double distributionFactorUpper = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); - boolean closeIdleReaders = - config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); Duration connectTimeout = config.get(CONNECT_TIMEOUT); int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); - validateIntegerOption( - SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); + validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1); validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1); @@ -154,7 +150,12 @@ public class MySqlDataSourceFactory implements DataSourceFactory { .jdbcProperties(getJdbcProperties(configMap)); Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); - configFactory.tableList(getTableList(configFactory.createConfig(0), selectors)); + String[] capturedTables = getTableList(configFactory.createConfig(0), selectors); + if (capturedTables.length == 0) { + throw new IllegalArgumentException( + "Cannot find any table by the option 'tables' = " + tables); + } + configFactory.tableList(capturedTables); return new MySqlDataSource(configFactory); } @@ -232,8 +233,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { return getSpecificOffset(config); case SCAN_STARTUP_MODE_VALUE_TIMESTAMP: - return StartupOptions.timestamp( - config.get(SCAN_STARTUP_TIMESTAMP_MILLIS)); + return StartupOptions.timestamp(config.get(SCAN_STARTUP_TIMESTAMP_MILLIS)); default: throw new ValidationException( @@ -250,12 +250,9 @@ public class MySqlDataSourceFactory implements DataSourceFactory { } private static void validateSpecificOffset(Configuration config) { - Optional gtidSet = - config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET); - Optional binlogFilename = - config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE); - Optional binlogPosition = - config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS); + Optional gtidSet = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET); + Optional binlogFilename = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE); + Optional binlogPosition = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS); if (!gtidSet.isPresent() && !(binlogFilename.isPresent() && binlogPosition.isPresent())) { throw new ValidationException( String.format( @@ -274,10 +271,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory { .ifPresent(offsetBuilder::setGtidSet); // Binlog file + pos - Optional binlogFilename = - config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE); - Optional binlogPosition = - config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS); + Optional binlogFilename = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE); + Optional binlogPosition = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS); if (binlogFilename.isPresent() && binlogPosition.isPresent()) { offsetBuilder.setBinlogFilePosition(binlogFilename.get(), binlogPosition.get()); } else { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java index bb948707a..cf50d5b84 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSource.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.mysql.source; import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.annotation.VisibleForTesting; import com.ververica.cdc.common.event.Event; import com.ververica.cdc.common.source.DataSource; import com.ververica.cdc.common.source.EventSourceProvider; @@ -64,4 +65,9 @@ public class MySqlDataSource implements DataSource { public MetadataAccessor getMetadataAccessor() { return new MySqlMetadataAccessor(sourceConfig); } + + @VisibleForTesting + public MySqlSourceConfig getSourceConfig() { + return sourceConfig; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 72cbdf6fd..3e33e23db 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -228,5 +228,6 @@ public class MySqlDataSourceOptions { ConfigOptions.key("schema-change.enabled") .booleanType() .defaultValue(false) - .withDescription("Whether , by default is false."); + .withDescription( + "Whether send schema change events, by default is false. If set to false, the schema changes will not be sent."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 70e92c104..867e9a926 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -16,13 +16,19 @@ package com.ververica.cdc.connectors.mysql.source; +import com.esri.core.geometry.ogc.OGCGeometry; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.data.binary.BinaryStringData; import com.ververica.cdc.common.event.SchemaChangeEvent; import com.ververica.cdc.common.event.TableId; import com.ververica.cdc.connectors.mysql.source.parser.CustomMySqlAntlrDdlParser; import com.ververica.cdc.debezium.event.DebeziumEventDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumChangelogMode; import io.debezium.data.Envelope; +import io.debezium.data.geometry.Geometry; +import io.debezium.data.geometry.Point; import io.debezium.relational.Tables; import io.debezium.relational.history.HistoryRecord; import org.apache.kafka.connect.data.Schema; @@ -30,10 +36,13 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.ZoneId; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord; @@ -46,6 +55,8 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = "io.debezium.connector.mysql.SchemaChangeKey"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final boolean includeSchemaChanges; private transient Tables tables; @@ -55,7 +66,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { DebeziumChangelogMode changelogMode, ZoneId serverTimeZone, boolean includeSchemaChanges) { - super(changelogMode, serverTimeZone); + super(new MySqlSchemaDataTypeInference(), changelogMode, serverTimeZone); this.includeSchemaChanges = includeSchemaChanges; } @@ -102,16 +113,44 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { @Override protected TableId getTableId(SourceRecord record) { - if (isDataChangeRecord(record)) { - String[] parts = record.topic().split("\\."); - return TableId.tableId(parts[1], parts[2]); - } - // TODO: get table id from schema change record - return null; + String[] parts = record.topic().split("\\."); + return TableId.tableId(parts[1], parts[2]); } @Override protected Map getMetadata(SourceRecord record) { return Collections.emptyMap(); } + + @Override + protected Object convertToString(Object dbzObj, Schema schema) { + // the Geometry datatype in MySQL will be converted to + // a String with Json format + if (Point.LOGICAL_NAME.equals(schema.name()) + || Geometry.LOGICAL_NAME.equals(schema.name())) { + try { + Struct geometryStruct = (Struct) dbzObj; + byte[] wkb = geometryStruct.getBytes("wkb"); + String geoJson = OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson(); + JsonNode originGeoNode = OBJECT_MAPPER.readTree(geoJson); + Optional srid = Optional.ofNullable(geometryStruct.getInt32("srid")); + Map geometryInfo = new HashMap<>(); + String geometryType = originGeoNode.get("type").asText(); + geometryInfo.put("type", geometryType); + if (geometryType.equals("GeometryCollection")) { + geometryInfo.put("geometries", originGeoNode.get("geometries")); + } else { + geometryInfo.put("coordinates", originGeoNode.get("coordinates")); + } + geometryInfo.put("srid", srid.orElse(0)); + return BinaryStringData.fromString( + OBJECT_MAPPER.writer().writeValueAsString(geometryInfo)); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Failed to convert %s to geometry JSON.", dbzObj), e); + } + } else { + return BinaryStringData.fromString(dbzObj.toString()); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSchemaDataTypeInference.java new file mode 100644 index 000000000..213d973bc --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSchemaDataTypeInference.java @@ -0,0 +1,43 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DataTypes; +import com.ververica.cdc.debezium.event.DebeziumSchemaDataTypeInference; +import io.debezium.data.geometry.Geometry; +import io.debezium.data.geometry.Point; +import org.apache.kafka.connect.data.Schema; + +/** {@link DataType} inference for MySQL debezium {@link Schema}. */ +@Internal +public class MySqlSchemaDataTypeInference extends DebeziumSchemaDataTypeInference { + + private static final long serialVersionUID = 1L; + + protected DataType inferStruct(Object value, Schema schema) { + // the Geometry datatype in MySQL will be converted to + // a String with Json format + if (Point.LOGICAL_NAME.equals(schema.name()) + || Geometry.LOGICAL_NAME.equals(schema.name())) { + return DataTypes.STRING(); + } else { + return super.inferStruct(value, schema); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index fd14e1246..4c099efc7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -23,13 +23,13 @@ import com.ververica.cdc.common.event.CreateTableEvent; import com.ververica.cdc.common.event.Event; import com.ververica.cdc.common.schema.Schema; import com.ververica.cdc.common.types.DataType; -import com.ververica.cdc.connectors.mysql.schema.MySqlCdcCommonTypeUtils; import com.ververica.cdc.connectors.mysql.schema.MySqlFieldDefinition; import com.ververica.cdc.connectors.mysql.schema.MySqlTableDefinition; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState; import com.ververica.cdc.connectors.mysql.table.StartupMode; +import com.ververica.cdc.connectors.mysql.utils.MySqlTypeUtils; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.jdbc.JdbcConnection; @@ -200,7 +200,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { Column column = columns.get(i); String colName = column.name(); - DataType dataType = MySqlCdcCommonTypeUtils.fromDbzColumn(column); + DataType dataType = MySqlTypeUtils.fromDbzColumn(column); if (!column.isOptional()) { dataType = dataType.notNull(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index caa01c98b..8eacd95fe 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -19,7 +19,6 @@ package com.ververica.cdc.connectors.mysql.utils; import com.ververica.cdc.common.event.TableId; import com.ververica.cdc.common.schema.Column; import com.ververica.cdc.common.schema.Schema; -import com.ververica.cdc.connectors.mysql.schema.MySqlCdcCommonTypeUtils; import com.ververica.cdc.connectors.mysql.schema.MySqlSchema; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import io.debezium.connector.mysql.MySqlConnection; @@ -147,7 +146,7 @@ public class MySqlSchemaUtils { public static Column toColumn(io.debezium.relational.Column column) { return Column.physicalColumn( - column.name(), MySqlCdcCommonTypeUtils.fromDbzColumn(column), column.comment()); + column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment()); } public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlCdcCommonTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlTypeUtils.java similarity index 78% rename from flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlCdcCommonTypeUtils.java rename to flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlTypeUtils.java index 20d55dc00..cd4111954 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlCdcCommonTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -14,18 +14,24 @@ * limitations under the License. */ -package com.ververica.cdc.connectors.mysql.schema; +package com.ververica.cdc.connectors.mysql.utils; import com.ververica.cdc.common.types.DataType; import com.ververica.cdc.common.types.DataTypes; import io.debezium.relational.Column; /** Utilities for converting from MySQL types to {@link DataType}s. */ -public class MySqlCdcCommonTypeUtils { +public class MySqlTypeUtils { // ------ MySQL Type ------ // https://dev.mysql.com/doc/refman/8.0/en/data-types.html private static final String BIT = "BIT"; + /* + * BOOLEAN type will be returned when handling the change event from SQL like: + * ALTER TABLE `student` CHANGE COLUMN `is_male` `is_female` BOOLEAN NULL; + */ + private static final String BOOLEAN = "BOOLEAN"; + private static final String BOOL = "BOOL"; private static final String TINYINT = "TINYINT"; private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED"; private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL"; @@ -38,6 +44,9 @@ public class MySqlCdcCommonTypeUtils { private static final String INT = "INT"; private static final String INT_UNSIGNED = "INT UNSIGNED"; private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL"; + private static final String INTEGER = "INTEGER"; + private static final String INTEGER_UNSIGNED = "INTEGER UNSIGNED"; + private static final String INTEGER_UNSIGNED_ZEROFILL = "INTEGER UNSIGNED ZEROFILL"; private static final String BIGINT = "BIGINT"; private static final String SERIAL = "SERIAL"; private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED"; @@ -85,6 +94,14 @@ public class MySqlCdcCommonTypeUtils { private static final String SET = "SET"; private static final String ENUM = "ENUM"; private static final String GEOMETRY = "GEOMETRY"; + private static final String POINT = "POINT"; + private static final String LINESTRING = "LINESTRING"; + private static final String POLYGON = "POLYGON"; + private static final String GEOMCOLLECTION = "GEOMCOLLECTION"; + private static final String GEOMETRYCOLLECTION = "GEOMETRYCOLLECTION"; + private static final String MULTIPOINT = "MULTIPOINT"; + private static final String MULTIPOLYGON = "MULTIPOLYGON"; + private static final String MULTILINESTRING = "MULTILINESTRING"; private static final String UNKNOWN = "UNKNOWN"; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ @@ -104,6 +121,13 @@ public class MySqlCdcCommonTypeUtils { private static DataType convertFromColumn(Column column) { String typeName = column.typeName(); switch (typeName) { + case BIT: + return column.length() == 1 + ? DataTypes.BOOLEAN() + : DataTypes.BINARY(column.length() / 8); + case BOOL: + case BOOLEAN: + return DataTypes.BOOLEAN(); case TINYINT: // MySQL haven't boolean type, it uses tinyint(1) to represents boolean type // user should not use tinyint(1) to store number although jdbc url parameter @@ -117,10 +141,14 @@ public class MySqlCdcCommonTypeUtils { case SMALLINT_UNSIGNED: case SMALLINT_UNSIGNED_ZEROFILL: case INT: + case INTEGER: case MEDIUMINT: + case YEAR: return DataTypes.INT(); case INT_UNSIGNED: case INT_UNSIGNED_ZEROFILL: + case INTEGER_UNSIGNED: + case INTEGER_UNSIGNED_ZEROFILL: case MEDIUMINT_UNSIGNED: case MEDIUMINT_UNSIGNED_ZEROFILL: case BIGINT: @@ -160,27 +188,49 @@ public class MySqlCdcCommonTypeUtils { case DATE: return DataTypes.DATE(); case DATETIME: + return column.length() >= 0 + ? DataTypes.TIMESTAMP(column.length()) + : DataTypes.TIMESTAMP(0); case TIMESTAMP: return column.length() >= 0 ? DataTypes.TIMESTAMP_LTZ(column.length()) - : DataTypes.TIMESTAMP_LTZ(); + : DataTypes.TIMESTAMP_LTZ(0); case CHAR: return DataTypes.CHAR(column.length()); case VARCHAR: return DataTypes.VARCHAR(column.length()); + case TINYTEXT: case TEXT: + case MEDIUMTEXT: + case LONGTEXT: + case JSON: + case ENUM: + case GEOMETRY: + case POINT: + case LINESTRING: + case POLYGON: + case GEOMETRYCOLLECTION: + case GEOMCOLLECTION: + case MULTIPOINT: + case MULTIPOLYGON: + case MULTILINESTRING: return DataTypes.STRING(); case BINARY: return DataTypes.BINARY(column.length()); case VARBINARY: return DataTypes.VARBINARY(column.length()); + case TINYBLOB: case BLOB: + case MEDIUMBLOB: + case LONGBLOB: return DataTypes.BYTES(); + case SET: + return DataTypes.ARRAY(DataTypes.STRING()); default: throw new UnsupportedOperationException( String.format("Don't support MySQL type '%s' yet.", typeName)); } } - private MySqlCdcCommonTypeUtils() {} + private MySqlTypeUtils() {} } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/ConnectSchemaTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/ConnectSchemaTypeInference.java deleted file mode 100644 index f1eeb6235..000000000 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/ConnectSchemaTypeInference.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2023 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.debezium.event; - -import com.ververica.cdc.common.types.DataField; -import com.ververica.cdc.common.types.DataType; -import com.ververica.cdc.common.types.DataTypes; -import org.apache.kafka.connect.data.Schema; - -/** Utility class to convert {@link Schema} to {@link DataType}. */ -public class ConnectSchemaTypeInference { - - public static DataType infer(Schema schema) { - return schema.isOptional() - ? infer(schema, schema.type()) - : infer(schema, schema.type()).notNull(); - } - - private static DataType infer(Schema schema, Schema.Type type) { - switch (type) { - case INT8: - return DataTypes.TINYINT(); - case INT16: - return DataTypes.SMALLINT(); - case INT32: - return DataTypes.INT(); - case INT64: - return DataTypes.BIGINT(); - case FLOAT32: - return DataTypes.FLOAT(); - case FLOAT64: - return DataTypes.DOUBLE(); - case BOOLEAN: - return DataTypes.BOOLEAN(); - case STRING: - return DataTypes.STRING(); - case BYTES: - return DataTypes.BYTES(); - case ARRAY: - return DataTypes.ARRAY(infer(schema.valueSchema())); - case MAP: - return DataTypes.MAP(infer(schema.keySchema()), infer(schema.valueSchema())); - case STRUCT: - return DataTypes.ROW( - schema.fields().stream() - .map(f -> DataTypes.FIELD(f.name(), infer(f.schema()))) - .toArray(DataField[]::new)); - default: - throw new UnsupportedOperationException( - "Unsupported type: " + schema.type().getName()); - } - } -} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java index cdaa95614..d6798c65d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -76,6 +76,9 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve private static final Map CONVERTERS = new ConcurrentHashMap<>(); + /** The schema data type inference. */ + protected final SchemaDataTypeInference schemaDataTypeInference; + /** Changelog Mode to use for encoding changes in Flink internal data structure. */ protected final DebeziumChangelogMode changelogMode; @@ -83,7 +86,10 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve protected final ZoneId serverTimeZone; public DebeziumEventDeserializationSchema( - DebeziumChangelogMode changelogMode, ZoneId serverTimeZone) { + SchemaDataTypeInference schemaDataTypeInference, + DebeziumChangelogMode changelogMode, + ZoneId serverTimeZone) { + this.schemaDataTypeInference = schemaDataTypeInference; this.changelogMode = changelogMode; this.serverTimeZone = serverTimeZone; } @@ -141,14 +147,12 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve } private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception { - DataType dataType = ConnectSchemaTypeInference.infer(valueSchema); - return (RecordData) - getOrCreateConverter(dataType, serverTimeZone).convert(value, valueSchema); + DataType dataType = schemaDataTypeInference.infer(value, valueSchema); + return (RecordData) getOrCreateConverter(dataType).convert(value, valueSchema); } - private static DeserializationRuntimeConverter getOrCreateConverter( - DataType type, ZoneId serverTimeZone) { - return CONVERTERS.computeIfAbsent(type, t -> createConverter(t, serverTimeZone)); + private DeserializationRuntimeConverter getOrCreateConverter(DataType type) { + return CONVERTERS.computeIfAbsent(type, this::createConverter); } // ------------------------------------------------------------------------------------- @@ -156,9 +160,8 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve // ------------------------------------------------------------------------------------- /** Creates a runtime converter which is null safe. */ - private static DeserializationRuntimeConverter createConverter( - DataType type, ZoneId serverTimeZone) { - return wrapIntoNullableConverter(createNotNullConverter(type, serverTimeZone)); + private DeserializationRuntimeConverter createConverter(DataType type) { + return wrapIntoNullableConverter(createNotNullConverter(type)); } // -------------------------------------------------------------------------------- @@ -168,58 +171,57 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve // -------------------------------------------------------------------------------- /** Creates a runtime converter which assuming input object is not null. */ - public static DeserializationRuntimeConverter createNotNullConverter( - DataType type, ZoneId serverTimeZone) { + protected DeserializationRuntimeConverter createNotNullConverter(DataType type) { // if no matched user defined converter, fallback to the default converter switch (type.getTypeRoot()) { case BOOLEAN: - return convertToBoolean(); + return this::convertToBoolean; case TINYINT: - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - return Byte.parseByte(dbzObj.toString()); - } - }; + return this::convertToByte; case SMALLINT: - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - return Short.parseShort(dbzObj.toString()); - } - }; + return this::convertToShort; case INTEGER: - return convertToInt(); + return this::convertToInt; case BIGINT: - return convertToLong(); + return this::convertToLong; case DATE: - return convertToDate(); + return this::convertToDate; case TIME_WITHOUT_TIME_ZONE: - return convertToTime(); + return this::convertToTime; case TIMESTAMP_WITHOUT_TIME_ZONE: - return convertToTimestamp(serverTimeZone); + return this::convertToTimestamp; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return convertToLocalTimeZoneTimestamp(serverTimeZone); + return this::convertToLocalTimeZoneTimestamp; case FLOAT: - return convertToFloat(); + return this::convertToFloat; case DOUBLE: - return convertToDouble(); + return this::convertToDouble; case CHAR: case VARCHAR: - return convertToString(); + return this::convertToString; case BINARY: case VARBINARY: - return convertToBinary(); + return this::convertToBinary; case DECIMAL: - return createDecimalConverter((DecimalType) type); + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return convertToDecimal((DecimalType) type, dbzObj, schema); + } + }; case ROW: - return createRowConverter((RowType) type, serverTimeZone); + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + return convertToRecord((RowType) type, dbzObj, schema); + } + }; case ARRAY: case MAP: default: @@ -227,288 +229,186 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve } } - private static DeserializationRuntimeConverter convertToBoolean() { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - if (dbzObj instanceof Boolean) { - return dbzObj; - } else if (dbzObj instanceof Byte) { - return (byte) dbzObj == 1; - } else if (dbzObj instanceof Short) { - return (short) dbzObj == 1; - } else { - return Boolean.parseBoolean(dbzObj.toString()); - } - } - }; + protected Object convertToBoolean(Object dbzObj, Schema schema) { + if (dbzObj instanceof Boolean) { + return dbzObj; + } else if (dbzObj instanceof Byte) { + return (byte) dbzObj == 1; + } else if (dbzObj instanceof Short) { + return (short) dbzObj == 1; + } else { + return Boolean.parseBoolean(dbzObj.toString()); + } } - private static DeserializationRuntimeConverter convertToInt() { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - if (dbzObj instanceof Integer) { - return dbzObj; - } else if (dbzObj instanceof Long) { - return ((Long) dbzObj).intValue(); - } else { - return Integer.parseInt(dbzObj.toString()); - } - } - }; + protected Object convertToByte(Object dbzObj, Schema schema) { + return Byte.parseByte(dbzObj.toString()); } - private static DeserializationRuntimeConverter convertToLong() { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - if (dbzObj instanceof Integer) { - return ((Integer) dbzObj).longValue(); - } else if (dbzObj instanceof Long) { - return dbzObj; - } else { - return Long.parseLong(dbzObj.toString()); - } - } - }; + protected Object convertToShort(Object dbzObj, Schema schema) { + return Short.parseShort(dbzObj.toString()); } - private static DeserializationRuntimeConverter convertToDouble() { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - if (dbzObj instanceof Float) { - return ((Float) dbzObj).doubleValue(); - } else if (dbzObj instanceof Double) { - return dbzObj; - } else { - return Double.parseDouble(dbzObj.toString()); - } - } - }; + protected Object convertToInt(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return dbzObj; + } else if (dbzObj instanceof Long) { + return ((Long) dbzObj).intValue(); + } else { + return Integer.parseInt(dbzObj.toString()); + } } - private static DeserializationRuntimeConverter convertToFloat() { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - if (dbzObj instanceof Float) { - return dbzObj; - } else if (dbzObj instanceof Double) { - return ((Double) dbzObj).floatValue(); - } else { - return Float.parseFloat(dbzObj.toString()); - } - } - }; + protected Object convertToLong(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return ((Integer) dbzObj).longValue(); + } else if (dbzObj instanceof Long) { + return dbzObj; + } else { + return Long.parseLong(dbzObj.toString()); + } } - private static DeserializationRuntimeConverter convertToDate() { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay(); - } - }; + protected Object convertToDouble(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return ((Float) dbzObj).doubleValue(); + } else if (dbzObj instanceof Double) { + return dbzObj; + } else { + return Double.parseDouble(dbzObj.toString()); + } } - private static DeserializationRuntimeConverter convertToTime() { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - if (dbzObj instanceof Long) { - switch (schema.name()) { - case MicroTime.SCHEMA_NAME: - return (int) ((long) dbzObj / 1000); - case NanoTime.SCHEMA_NAME: - return (int) ((long) dbzObj / 1000_000); - } - } else if (dbzObj instanceof Integer) { - return dbzObj; - } - // get number of milliseconds of the day - return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000; - } - }; + protected Object convertToFloat(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return dbzObj; + } else if (dbzObj instanceof Double) { + return ((Double) dbzObj).floatValue(); + } else { + return Float.parseFloat(dbzObj.toString()); + } } - private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - if (dbzObj instanceof Long) { - switch (schema.name()) { - case Timestamp.SCHEMA_NAME: - return TimestampData.fromMillis((Long) dbzObj); - case MicroTimestamp.SCHEMA_NAME: - long micro = (long) dbzObj; - return TimestampData.fromMillis( - micro / 1000, (int) (micro % 1000 * 1000)); - case NanoTimestamp.SCHEMA_NAME: - long nano = (long) dbzObj; - return TimestampData.fromMillis( - nano / 1000_000, (int) (nano % 1000_000)); - } - } - LocalDateTime localDateTime = - TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); - return TimestampData.fromLocalDateTime(localDateTime); - } - }; + protected Object convertToDate(Object dbzObj, Schema schema) { + return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay(); } - private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp( - ZoneId serverTimeZone) { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - if (dbzObj instanceof String) { - String str = (String) dbzObj; - // TIMESTAMP_LTZ type is encoded in string type - Instant instant = Instant.parse(str); - return TimestampData.fromLocalDateTime( - LocalDateTime.ofInstant(instant, serverTimeZone)); - } - throw new IllegalArgumentException( - "Unable to convert to TimestampData from unexpected value '" - + dbzObj - + "' of type " - + dbzObj.getClass().getName()); + protected Object convertToTime(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case MicroTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000); + case NanoTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000_000); } - }; + } else if (dbzObj instanceof Integer) { + return dbzObj; + } + // get number of milliseconds of the day + return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000; } - private static DeserializationRuntimeConverter convertToString() { - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - return BinaryStringData.fromString(dbzObj.toString()); + protected Object convertToTimestamp(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case Timestamp.SCHEMA_NAME: + return TimestampData.fromMillis((Long) dbzObj); + case MicroTimestamp.SCHEMA_NAME: + long micro = (long) dbzObj; + return TimestampData.fromMillis(micro / 1000, (int) (micro % 1000 * 1000)); + case NanoTimestamp.SCHEMA_NAME: + long nano = (long) dbzObj; + return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000)); } - }; + } + LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); + return TimestampData.fromLocalDateTime(localDateTime); } - private static DeserializationRuntimeConverter convertToBinary() { - return new DeserializationRuntimeConverter() { + protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) { + if (dbzObj instanceof String) { + String str = (String) dbzObj; + // TIMESTAMP_LTZ type is encoded in string type + Instant instant = Instant.parse(str); + return TimestampData.fromMillis(instant.toEpochMilli(), instant.getNano()); + } + throw new IllegalArgumentException( + "Unable to convert to TimestampData from unexpected value '" + + dbzObj + + "' of type " + + dbzObj.getClass().getName()); + } - private static final long serialVersionUID = 1L; + protected Object convertToString(Object dbzObj, Schema schema) { + return BinaryStringData.fromString(dbzObj.toString()); + } - @Override - public Object convert(Object dbzObj, Schema schema) { - if (dbzObj instanceof byte[]) { - return dbzObj; - } else if (dbzObj instanceof ByteBuffer) { - ByteBuffer byteBuffer = (ByteBuffer) dbzObj; - byte[] bytes = new byte[byteBuffer.remaining()]; - byteBuffer.get(bytes); - return bytes; - } else { - throw new UnsupportedOperationException( - "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName()); - } - } - }; + protected Object convertToBinary(Object dbzObj, Schema schema) { + if (dbzObj instanceof byte[]) { + return dbzObj; + } else if (dbzObj instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) dbzObj; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else { + throw new UnsupportedOperationException( + "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName()); + } } - private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) { + protected Object convertToDecimal(DecimalType decimalType, Object dbzObj, Schema schema) { final int precision = decimalType.getPrecision(); final int scale = decimalType.getScale(); - return new DeserializationRuntimeConverter() { - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) { - BigDecimal bigDecimal; - if (dbzObj instanceof byte[]) { - // decimal.handling.mode=precise - bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj); - } else if (dbzObj instanceof String) { - // decimal.handling.mode=string - bigDecimal = new BigDecimal((String) dbzObj); - } else if (dbzObj instanceof Double) { - // decimal.handling.mode=double - bigDecimal = BigDecimal.valueOf((Double) dbzObj); - } else { - if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { - SpecialValueDecimal decimal = - VariableScaleDecimal.toLogical((Struct) dbzObj); - bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); - } else { - // fallback to string - bigDecimal = new BigDecimal(dbzObj.toString()); - } - } - return DecimalData.fromBigDecimal(bigDecimal, precision, scale); + BigDecimal bigDecimal; + if (dbzObj instanceof byte[]) { + // decimal.handling.mode=precise + bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj); + } else if (dbzObj instanceof String) { + // decimal.handling.mode=string + bigDecimal = new BigDecimal((String) dbzObj); + } else if (dbzObj instanceof Double) { + // decimal.handling.mode=double + bigDecimal = BigDecimal.valueOf((Double) dbzObj); + } else { + if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { + SpecialValueDecimal decimal = VariableScaleDecimal.toLogical((Struct) dbzObj); + bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); + } else { + // fallback to string + bigDecimal = new BigDecimal(dbzObj.toString()); } - }; + } + return DecimalData.fromBigDecimal(bigDecimal, precision, scale); } - private static DeserializationRuntimeConverter createRowConverter( - RowType rowType, ZoneId serverTimeZone) { - final DeserializationRuntimeConverter[] fieldConverters = + protected Object convertToRecord(RowType rowType, Object dbzObj, Schema schema) + throws Exception { + DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream() .map(DataField::getType) - .map(logicType -> createConverter(logicType, serverTimeZone)) + .map(this::createConverter) .toArray(DeserializationRuntimeConverter[]::new); - final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); - final BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); - - return new DeserializationRuntimeConverter() { - - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Object dbzObj, Schema schema) throws Exception { - Struct struct = (Struct) dbzObj; - int arity = fieldNames.length; - Object[] fields = new Object[arity]; - for (int i = 0; i < arity; i++) { - String fieldName = fieldNames[i]; - Field field = schema.field(fieldName); - if (field == null) { - fields[i] = null; - } else { - Object fieldValue = struct.getWithoutDefault(fieldName); - Schema fieldSchema = schema.field(fieldName).schema(); - Object convertedField = - convertField(fieldConverters[i], fieldValue, fieldSchema); - fields[i] = convertedField; - } - } - return generator.generate(fields); + String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); + + Struct struct = (Struct) dbzObj; + int arity = fieldNames.length; + Object[] fields = new Object[arity]; + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + Field field = schema.field(fieldName); + if (field == null) { + fields[i] = null; + } else { + Object fieldValue = struct.getWithoutDefault(fieldName); + Schema fieldSchema = schema.field(fieldName).schema(); + Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema); + fields[i] = convertedField; } - }; + } + return generator.generate(fields); } private static Object convertField( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumSchemaDataTypeInference.java new file mode 100644 index 000000000..533a07e4d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumSchemaDataTypeInference.java @@ -0,0 +1,198 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.debezium.event; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.types.DataField; +import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DataTypes; +import io.debezium.data.VariableScaleDecimal; +import io.debezium.data.geometry.Geometry; +import io.debezium.data.geometry.Point; +import io.debezium.time.MicroTime; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTime; +import io.debezium.time.NanoTimestamp; +import io.debezium.time.Time; +import io.debezium.time.Timestamp; +import io.debezium.time.ZonedTimestamp; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.Instant; +import java.util.Optional; + +import static com.ververica.cdc.common.types.DecimalType.DEFAULT_PRECISION; + +/** {@link DataType} inference for debezium {@link Schema}. */ +@Internal +public class DebeziumSchemaDataTypeInference implements SchemaDataTypeInference, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public DataType infer(Object value, Schema schema) { + return schema.isOptional() + ? infer(value, schema, schema.type()) + : infer(value, schema, schema.type()).notNull(); + } + + protected DataType infer(Object value, Schema schema, Schema.Type type) { + switch (type) { + case INT8: + return inferInt8(value, schema); + case INT16: + return inferInt16(value, schema); + case INT32: + return inferInt32(value, schema); + case INT64: + return inferInt64(value, schema); + case FLOAT32: + return inferFloat32(value, schema); + case FLOAT64: + return inferFloat64(value, schema); + case BOOLEAN: + return inferBoolean(value, schema); + case STRING: + return inferString(value, schema); + case BYTES: + return inferBytes(value, schema); + case STRUCT: + return inferStruct(value, schema); + case ARRAY: + return inferArray(value, schema); + case MAP: + return inferMap(value, schema); + default: + throw new UnsupportedOperationException( + "Unsupported type: " + schema.type().getName()); + } + } + + protected DataType inferBoolean(Object value, Schema schema) { + return DataTypes.BOOLEAN(); + } + + protected DataType inferInt8(Object value, Schema schema) { + return DataTypes.TINYINT(); + } + + protected DataType inferInt16(Object value, Schema schema) { + return DataTypes.SMALLINT(); + } + + protected DataType inferInt32(Object value, Schema schema) { + if (Date.LOGICAL_NAME.equals(schema.name())) { + return DataTypes.DATE(); + } + if (Time.SCHEMA_NAME.equals(schema.name())) { + return DataTypes.TIME(3); + } + return DataTypes.INT(); + } + + protected DataType inferInt64(Object value, Schema schema) { + if (MicroTime.SCHEMA_NAME.equals(schema.name())) { + return DataTypes.TIME(6); + } + if (NanoTime.SCHEMA_NAME.equals(schema.name())) { + return DataTypes.TIME(9); + } + if (Timestamp.SCHEMA_NAME.equals(schema.name())) { + return DataTypes.TIMESTAMP(3); + } + if (MicroTimestamp.SCHEMA_NAME.equals(schema.name())) { + return DataTypes.TIMESTAMP(6); + } + if (NanoTimestamp.SCHEMA_NAME.equals(schema.name())) { + return DataTypes.TIMESTAMP(9); + } + return DataTypes.BIGINT(); + } + + protected DataType inferFloat32(Object value, Schema schema) { + return DataTypes.FLOAT(); + } + + protected DataType inferFloat64(Object value, Schema schema) { + return DataTypes.DOUBLE(); + } + + protected DataType inferString(Object value, Schema schema) { + if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) { + int nano = + Optional.ofNullable((String) value) + .map(Instant::parse) + .map(Instant::getNano) + .orElse(0); + + int precision; + if (nano == 0) { + precision = 0; + } else if (nano % 1000 > 0) { + precision = 9; + } else if (nano % 1000_000 > 0) { + precision = 6; + } else if (nano % 1000_000_000 > 0) { + precision = 3; + } else { + precision = 0; + } + return DataTypes.TIMESTAMP(precision); + } + return DataTypes.STRING(); + } + + protected DataType inferBytes(Object value, Schema schema) { + if (Decimal.LOGICAL_NAME.equals(schema.name()) + || VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { + if (value instanceof BigDecimal) { + BigDecimal decimal = (BigDecimal) value; + return DataTypes.DECIMAL(decimal.precision(), decimal.scale()); + } + return DataTypes.DECIMAL(DEFAULT_PRECISION, 0); + } + return DataTypes.BYTES(); + } + + protected DataType inferStruct(Object value, Schema schema) { + Struct struct = (Struct) value; + if (Geometry.LOGICAL_NAME.equals(schema.name()) + || Point.LOGICAL_NAME.equals(schema.name())) { + return DataTypes.STRING(); + } + return DataTypes.ROW( + schema.fields().stream() + .map( + f -> + DataTypes.FIELD( + f.name(), infer(struct.get(f.name()), f.schema()))) + .toArray(DataField[]::new)); + } + + protected DataType inferArray(Object value, Schema schema) { + throw new UnsupportedOperationException("Unsupported type ARRAY"); + } + + protected DataType inferMap(Object value, Schema schema) { + throw new UnsupportedOperationException("Unsupported type MAP"); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SchemaDataTypeInference.java new file mode 100644 index 000000000..cd6cebf68 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SchemaDataTypeInference.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.debezium.event; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.types.DataType; +import org.apache.kafka.connect.data.Schema; + +/** {@link DataType} inference for kafka connect {@link Schema}. */ +@Internal +public interface SchemaDataTypeInference { + + /** + * Infer {@link DataType} from {@link Schema}. + * + * @param value the value corresponding value to SCHEMA + * @param schema the kafka connect schema + * @return the inferred data type + */ + DataType infer(Object value, Schema schema); +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SourceRecordEventDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SourceRecordEventDeserializer.java index cb38fef1d..98fb65d5f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SourceRecordEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/SourceRecordEventDeserializer.java @@ -68,10 +68,10 @@ public abstract class SourceRecordEventDeserializer implements EventDeserializer protected abstract List deserializeSchemaChangeRecord(SourceRecord record) throws Exception; - /** Get {@link TableId} from given record. */ + /** Get {@link TableId} from data change record. */ protected abstract TableId getTableId(SourceRecord record); - /** Get metadata from given record. */ + /** Get metadata from data change record. */ protected abstract Map getMetadata(SourceRecord record); public static Schema fieldSchema(Schema schema, String fieldName) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTypeUtils.java index 5817c2314..8fee59f03 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTypeUtils.java @@ -27,6 +27,12 @@ public class MySqlTypeUtils { // ------ MySQL Type ------ // https://dev.mysql.com/doc/refman/8.0/en/data-types.html private static final String BIT = "BIT"; + /* + * BOOLEAN type will be returned when handling the change event from SQL like: + * ALTER TABLE `student` CHANGE COLUMN `is_male` `is_female` BOOLEAN NULL; + */ + private static final String BOOLEAN = "BOOLEAN"; + private static final String BOOL = "BOOL"; private static final String TINYINT = "TINYINT"; private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED"; private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL"; @@ -39,6 +45,9 @@ public class MySqlTypeUtils { private static final String INT = "INT"; private static final String INT_UNSIGNED = "INT UNSIGNED"; private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL"; + private static final String INTEGER = "INTEGER"; + private static final String INTEGER_UNSIGNED = "INTEGER UNSIGNED"; + private static final String INTEGER_UNSIGNED_ZEROFILL = "INTEGER UNSIGNED ZEROFILL"; private static final String BIGINT = "BIGINT"; private static final String SERIAL = "SERIAL"; private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED"; @@ -86,6 +95,14 @@ public class MySqlTypeUtils { private static final String SET = "SET"; private static final String ENUM = "ENUM"; private static final String GEOMETRY = "GEOMETRY"; + private static final String POINT = "POINT"; + private static final String LINESTRING = "LINESTRING"; + private static final String POLYGON = "POLYGON"; + private static final String GEOMCOLLECTION = "GEOMCOLLECTION"; + private static final String GEOMETRYCOLLECTION = "GEOMETRYCOLLECTION"; + private static final String MULTIPOINT = "MULTIPOINT"; + private static final String MULTIPOLYGON = "MULTIPOLYGON"; + private static final String MULTILINESTRING = "MULTILINESTRING"; private static final String UNKNOWN = "UNKNOWN"; /** Returns a corresponding Flink data type from a debezium {@link Column}. */ @@ -105,6 +122,13 @@ public class MySqlTypeUtils { private static DataType convertFromColumn(Column column) { String typeName = column.typeName(); switch (typeName) { + case BIT: + return column.length() == 1 + ? DataTypes.BOOLEAN() + : DataTypes.BINARY(column.length() / 8); + case BOOL: + case BOOLEAN: + return DataTypes.BOOLEAN(); case TINYINT: // MySQL haven't boolean type, it uses tinyint(1) to represents boolean type // user should not use tinyint(1) to store number although jdbc url parameter @@ -118,10 +142,14 @@ public class MySqlTypeUtils { case SMALLINT_UNSIGNED: case SMALLINT_UNSIGNED_ZEROFILL: case INT: + case INTEGER: case MEDIUMINT: + case YEAR: return DataTypes.INT(); case INT_UNSIGNED: case INT_UNSIGNED_ZEROFILL: + case INTEGER_UNSIGNED: + case INTEGER_UNSIGNED_ZEROFILL: case MEDIUMINT_UNSIGNED: case MEDIUMINT_UNSIGNED_ZEROFILL: case BIGINT: @@ -161,25 +189,49 @@ public class MySqlTypeUtils { case DATE: return DataTypes.DATE(); case DATETIME: - case TIMESTAMP: return column.length() >= 0 ? DataTypes.TIMESTAMP(column.length()) - : DataTypes.TIMESTAMP(); + : DataTypes.TIMESTAMP(0); + case TIMESTAMP: + return column.length() >= 0 + ? DataTypes.TIMESTAMP_LTZ(column.length()) + : DataTypes.TIMESTAMP_LTZ(0); case CHAR: return DataTypes.CHAR(column.length()); case VARCHAR: return DataTypes.VARCHAR(column.length()); + case TINYTEXT: case TEXT: + case MEDIUMTEXT: + case LONGTEXT: + case JSON: + case ENUM: + case GEOMETRY: + case POINT: + case LINESTRING: + case POLYGON: + case GEOMETRYCOLLECTION: + case GEOMCOLLECTION: + case MULTIPOINT: + case MULTIPOLYGON: + case MULTILINESTRING: return DataTypes.STRING(); case BINARY: return DataTypes.BINARY(column.length()); case VARBINARY: return DataTypes.VARBINARY(column.length()); + case TINYBLOB: case BLOB: + case MEDIUMBLOB: + case LONGBLOB: return DataTypes.BYTES(); + case SET: + return DataTypes.ARRAY(DataTypes.STRING()); default: throw new UnsupportedOperationException( String.format("Don't support MySQL type '%s' yet.", typeName)); } } + + private MySqlTypeUtils() {} } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java index 52cb995e5..c29154b4e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSource.java @@ -16,7 +16,6 @@ package com.ververica.cdc.connectors.mysql.source; -import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -48,11 +47,13 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator; import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; +import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState; import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks; import com.ververica.cdc.connectors.mysql.table.StartupMode; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java new file mode 100644 index 000000000..686e9722c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java @@ -0,0 +1,1120 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary; +import io.debezium.DebeziumException; +import io.debezium.annotation.Immutable; +import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; +import io.debezium.data.Json; +import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.Column; +import io.debezium.relational.Table; +import io.debezium.relational.ValueConverter; +import io.debezium.time.Year; +import io.debezium.util.Strings; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteOrder; +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.Duration; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.temporal.ChronoField; +import java.time.temporal.ChronoUnit; +import java.time.temporal.Temporal; +import java.time.temporal.TemporalAdjuster; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Copied from Debezium project to fix FLOAT converted to FLOAT64 type issue. See DBZ-3865, + * DBZ-5843. Remove it when debezium version is upgraded above 2.0.0.Final. + * + *

Line 240 & 246: add FLOAT type adjustment. + * + *

Line 485 & 499: add method to convert FLOAT value. + */ +@Immutable +public class MySqlValueConverters extends JdbcValueConverters { + + @FunctionalInterface + public static interface ParsingErrorHandler { + void error(String message, Exception exception); + } + + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlValueConverters.class); + + /** Used to parse values of TIME columns. Format: 000:00:00.000000. */ + private static final Pattern TIME_FIELD_PATTERN = + Pattern.compile("(\\-?[0-9]*):([0-9]*):([0-9]*)(\\.([0-9]*))?"); + + /** Used to parse values of DATE columns. Format: 000-00-00. */ + private static final Pattern DATE_FIELD_PATTERN = Pattern.compile("([0-9]*)-([0-9]*)-([0-9]*)"); + + /** Used to parse values of TIMESTAMP columns. Format: 000-00-00 00:00:00.000. */ + private static final Pattern TIMESTAMP_FIELD_PATTERN = + Pattern.compile("([0-9]*)-([0-9]*)-([0-9]*) .*"); + + /** + * A utility method that adjusts ambiguous 2-digit + * year values of DATETIME, DATE, and TIMESTAMP types using these MySQL-specific rules: + * + *

    + *
  • Year values in the range 00-69 are converted to 2000-2069. + *
  • Year values in the range 70-99 are converted to 1970-1999. + *
+ * + * @param temporal the temporal instance to adjust; may not be null + * @return the possibly adjusted temporal instance; never null + */ + public static Temporal adjustTemporal(Temporal temporal) { + if (temporal.isSupported(ChronoField.YEAR)) { + int year = temporal.get(ChronoField.YEAR); + if (0 <= year && year <= 69) { + temporal = temporal.plus(2000, ChronoUnit.YEARS); + } else if (70 <= year && year <= 99) { + temporal = temporal.plus(1900, ChronoUnit.YEARS); + } + } + return temporal; + } + + private final ParsingErrorHandler parsingErrorHandler; + + /** + * Create a new instance that always uses UTC for the default time zone when_needed converting + * values without timezone information to values that require timezones. + * + *

+ * + * @param decimalMode how {@code DECIMAL} and {@code NUMERIC} values should be treated; may be + * null if {@link io.debezium.jdbc.JdbcValueConverters.DecimalMode#PRECISE} is to be used + * @param temporalPrecisionMode temporal precision mode based on {@link + * io.debezium.jdbc.TemporalPrecisionMode} + * @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null + * if {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used + * @param binaryMode how binary columns should be represented + */ + public MySqlValueConverters( + DecimalMode decimalMode, + TemporalPrecisionMode temporalPrecisionMode, + BigIntUnsignedMode bigIntUnsignedMode, + BinaryHandlingMode binaryMode) { + this( + decimalMode, + temporalPrecisionMode, + bigIntUnsignedMode, + binaryMode, + x -> x, + MySqlValueConverters::defaultParsingErrorHandler); + } + + /** + * Create a new instance that always uses UTC for the default time zone when converting values + * without timezone information to values that require timezones. + * + *

+ * + * @param decimalMode how {@code DECIMAL} and {@code NUMERIC} values should be treated; may be + * null if {@link io.debezium.jdbc.JdbcValueConverters.DecimalMode#PRECISE} is to be used + * @param temporalPrecisionMode temporal precision mode based on {@link + * io.debezium.jdbc.TemporalPrecisionMode} + * @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null + * if {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used + * @param binaryMode how binary columns should be represented + * @param adjuster a temporal adjuster to make a database specific time modification before + * conversion + * @param handler for errors during postponed binlog parsing + */ + public MySqlValueConverters( + DecimalMode decimalMode, + TemporalPrecisionMode temporalPrecisionMode, + BigIntUnsignedMode bigIntUnsignedMode, + BinaryHandlingMode binaryMode, + TemporalAdjuster adjuster, + ParsingErrorHandler parsingErrorHandler) { + super( + decimalMode, + temporalPrecisionMode, + ZoneOffset.UTC, + adjuster, + bigIntUnsignedMode, + binaryMode); + this.parsingErrorHandler = parsingErrorHandler; + } + + @Override + protected ByteOrder byteOrderOfBitType() { + return ByteOrder.BIG_ENDIAN; + } + + @Override + public SchemaBuilder schemaBuilder(Column column) { + // Handle a few MySQL-specific types based upon how they are handled by the MySQL binlog + // client ... + String typeName = column.typeName().toUpperCase(); + if (matches(typeName, "JSON")) { + return Json.builder(); + } + if (matches(typeName, "POINT")) { + return io.debezium.data.geometry.Point.builder(); + } + if (matches(typeName, "GEOMETRY") + || matches(typeName, "LINESTRING") + || matches(typeName, "POLYGON") + || matches(typeName, "MULTIPOINT") + || matches(typeName, "MULTILINESTRING") + || matches(typeName, "MULTIPOLYGON") + || isGeometryCollection(typeName)) { + return io.debezium.data.geometry.Geometry.builder(); + } + if (matches(typeName, "YEAR")) { + return Year.builder(); + } + if (matches(typeName, "ENUM")) { + String commaSeparatedOptions = extractEnumAndSetOptionsAsString(column); + return io.debezium.data.Enum.builder(commaSeparatedOptions); + } + if (matches(typeName, "SET")) { + String commaSeparatedOptions = extractEnumAndSetOptionsAsString(column); + return io.debezium.data.EnumSet.builder(commaSeparatedOptions); + } + if (matches(typeName, "SMALLINT UNSIGNED") + || matches(typeName, "SMALLINT UNSIGNED ZEROFILL") + || matches(typeName, "INT2 UNSIGNED") + || matches(typeName, "INT2 UNSIGNED ZEROFILL")) { + // In order to capture unsigned SMALLINT 16-bit data source, INT32 will be required to + // safely capture all valid values + // Source: + // https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html + return SchemaBuilder.int32(); + } + if (matches(typeName, "INT UNSIGNED") + || matches(typeName, "INT UNSIGNED ZEROFILL") + || matches(typeName, "INT4 UNSIGNED") + || matches(typeName, "INT4 UNSIGNED ZEROFILL")) { + // In order to capture unsigned INT 32-bit data source, INT64 will be required to safely + // capture all valid values + // Source: + // https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html + return SchemaBuilder.int64(); + } + if (matches(typeName, "BIGINT UNSIGNED") + || matches(typeName, "BIGINT UNSIGNED ZEROFILL") + || matches(typeName, "INT8 UNSIGNED") + || matches(typeName, "INT8 UNSIGNED ZEROFILL")) { + switch (super.bigIntUnsignedMode) { + case LONG: + return SchemaBuilder.int64(); + case PRECISE: + // In order to capture unsigned INT 64-bit data source, + // org.apache.kafka.connect.data.Decimal:Byte will be required to safely capture + // all valid values with scale of 0 + // Source: + // https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html + return Decimal.builder(0); + } + } + if ((matches(typeName, "FLOAT") + || matches(typeName, "FLOAT UNSIGNED") + || matches(typeName, "FLOAT UNSIGNED ZEROFILL")) + && !column.scale().isPresent() + && column.length() <= 24) { + return SchemaBuilder.float32(); + } + // Otherwise, let the base class handle it ... + return super.schemaBuilder(column); + } + + @Override + public ValueConverter converter(Column column, Field fieldDefn) { + // Handle a few MySQL-specific types based upon how they are handled by the MySQL binlog + // client ... + String typeName = column.typeName().toUpperCase(); + if (matches(typeName, "JSON")) { + return (data) -> convertJson(column, fieldDefn, data); + } + if (matches(typeName, "GEOMETRY") + || matches(typeName, "LINESTRING") + || matches(typeName, "POLYGON") + || matches(typeName, "MULTIPOINT") + || matches(typeName, "MULTILINESTRING") + || matches(typeName, "MULTIPOLYGON") + || isGeometryCollection(typeName)) { + return (data -> convertGeometry(column, fieldDefn, data)); + } + if (matches(typeName, "POINT")) { + // backwards compatibility + return (data -> convertPoint(column, fieldDefn, data)); + } + if (matches(typeName, "YEAR")) { + return (data) -> convertYearToInt(column, fieldDefn, data); + } + if (matches(typeName, "ENUM")) { + // Build up the character array based upon the column's type ... + List options = extractEnumAndSetOptions(column); + return (data) -> convertEnumToString(options, column, fieldDefn, data); + } + if (matches(typeName, "SET")) { + // Build up the character array based upon the column's type ... + List options = extractEnumAndSetOptions(column); + return (data) -> convertSetToString(options, column, fieldDefn, data); + } + if (matches(typeName, "TINYINT UNSIGNED") + || matches(typeName, "TINYINT UNSIGNED ZEROFILL") + || matches(typeName, "INT1 UNSIGNED") + || matches(typeName, "INT1 UNSIGNED ZEROFILL")) { + // Convert TINYINT UNSIGNED internally from SIGNED to UNSIGNED based on the boundary + // settings + return (data) -> convertUnsignedTinyint(column, fieldDefn, data); + } + if (matches(typeName, "SMALLINT UNSIGNED") + || matches(typeName, "SMALLINT UNSIGNED ZEROFILL") + || matches(typeName, "INT2 UNSIGNED") + || matches(typeName, "INT2 UNSIGNED ZEROFILL")) { + // Convert SMALLINT UNSIGNED internally from SIGNED to UNSIGNED based on the boundary + // settings + return (data) -> convertUnsignedSmallint(column, fieldDefn, data); + } + if (matches(typeName, "MEDIUMINT UNSIGNED") + || matches(typeName, "MEDIUMINT UNSIGNED ZEROFILL") + || matches(typeName, "INT3 UNSIGNED") + || matches(typeName, "INT3 UNSIGNED ZEROFILL") + || matches(typeName, "MIDDLEINT UNSIGNED") + || matches(typeName, "MIDDLEINT UNSIGNED ZEROFILL")) { + // Convert MEDIUMINT UNSIGNED internally from SIGNED to UNSIGNED based on the boundary + // settings + return (data) -> convertUnsignedMediumint(column, fieldDefn, data); + } + if (matches(typeName, "INT UNSIGNED") + || matches(typeName, "INT UNSIGNED ZEROFILL") + || matches(typeName, "INT4 UNSIGNED") + || matches(typeName, "INT4 UNSIGNED ZEROFILL")) { + // Convert INT UNSIGNED internally from SIGNED to UNSIGNED based on the boundary + // settings + return (data) -> convertUnsignedInt(column, fieldDefn, data); + } + if (matches(typeName, "BIGINT UNSIGNED") + || matches(typeName, "BIGINT UNSIGNED ZEROFILL") + || matches(typeName, "INT8 UNSIGNED") + || matches(typeName, "INT8 UNSIGNED ZEROFILL")) { + switch (super.bigIntUnsignedMode) { + case LONG: + return (data) -> convertBigInt(column, fieldDefn, data); + case PRECISE: + // Convert BIGINT UNSIGNED internally from SIGNED to UNSIGNED based on the + // boundary settings + return (data) -> convertUnsignedBigint(column, fieldDefn, data); + } + } + + // We have to convert bytes encoded in the column's character set ... + switch (column.jdbcType()) { + case Types.CHAR: // variable-length + case Types.VARCHAR: // variable-length + case Types.LONGVARCHAR: // variable-length + case Types.CLOB: // variable-length + case Types.NCHAR: // fixed-length + case Types.NVARCHAR: // fixed-length + case Types.LONGNVARCHAR: // fixed-length + case Types.NCLOB: // fixed-length + case Types.DATALINK: + case Types.SQLXML: + Charset charset = charsetFor(column); + if (charset != null) { + logger.debug("Using {} charset by default for column: {}", charset, column); + return (data) -> convertString(column, fieldDefn, charset, data); + } + logger.warn( + "Using UTF-8 charset by default for column without charset: {}", column); + return (data) -> convertString(column, fieldDefn, StandardCharsets.UTF_8, data); + case Types.TIME: + if (adaptiveTimeMicrosecondsPrecisionMode) { + return data -> convertDurationToMicroseconds(column, fieldDefn, data); + } + case Types.TIMESTAMP: + return ((ValueConverter) + (data -> convertTimestampToLocalDateTime(column, fieldDefn, data))) + .and(super.converter(column, fieldDefn)); + default: + break; + } + + // Otherwise, let the base class handle it ... + return super.converter(column, fieldDefn); + } + + /** + * Return the {@link Charset} instance with the MySQL-specific character set name used by the + * given column. + * + * @param column the column in which the character set is used; never null + * @return the Java {@link Charset}, or null if there is no mapping + */ + protected Charset charsetFor(Column column) { + String mySqlCharsetName = column.charsetName(); + if (mySqlCharsetName == null) { + logger.warn("Column is missing a character set: {}", column); + return null; + } + String encoding = MySqlConnection.getJavaEncodingForMysqlCharSet(mySqlCharsetName); + if (encoding == null) { + logger.debug( + "Column uses MySQL character set '{}', which has no mapping to a Java character set, will try it in lowercase", + mySqlCharsetName); + encoding = + MySqlConnection.getJavaEncodingForMysqlCharSet(mySqlCharsetName.toLowerCase()); + } + if (encoding == null) { + logger.warn( + "Column uses MySQL character set '{}', which has no mapping to a Java character set", + mySqlCharsetName); + } else { + try { + return Charset.forName(encoding); + } catch (IllegalCharsetNameException e) { + logger.error( + "Unable to load Java charset '{}' for column with MySQL character set '{}'", + encoding, + mySqlCharsetName); + } + } + return null; + } + + /** + * Convert the {@link String} {@code byte[]} value to a string value used in a {@link + * SourceRecord}. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never + * null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertJson(Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + "{}", + (r) -> { + if (data instanceof byte[]) { + // The BinlogReader sees these JSON values as binary encoded, so we use the + // binlog client library's utility + // to parse MySQL's internal binary representation into a JSON string, using + // the standard formatter. + + if (((byte[]) data).length == 0) { + r.deliver(column.isOptional() ? null : "{}"); + } else { + try { + r.deliver(JsonBinary.parseAsString((byte[]) data)); + } catch (IOException e) { + parsingErrorHandler.error( + "Failed to parse and read a JSON value on '" + + column + + "' value " + + Arrays.toString((byte[]) data), + e); + r.deliver(column.isOptional() ? null : "{}"); + } + } + } else if (data instanceof String) { + // The SnapshotReader sees JSON values as UTF-8 encoded strings. + r.deliver(data); + } + }); + } + + @Override + protected Object convertSmallInt(Column column, Field fieldDefn, Object data) { + // MySQL allows decimal default values for smallint columns + if (data instanceof String) { + data = Math.round(Double.parseDouble((String) data)); + } + + return super.convertSmallInt(column, fieldDefn, data); + } + + @Override + protected Object convertInteger(Column column, Field fieldDefn, Object data) { + // MySQL allows decimal default values for integer columns + if (data instanceof String) { + data = Math.round(Double.parseDouble((String) data)); + } + + return super.convertInteger(column, fieldDefn, data); + } + + @Override + protected Object convertBigInt(Column column, Field fieldDefn, Object data) { + // MySQL allows decimal default values for bigint columns + if (data instanceof String) { + data = Math.round(Double.parseDouble((String) data)); + } + + return super.convertBigInt(column, fieldDefn, data); + } + + /** + * MySql reports FLOAT(p) values as FLOAT and DOUBLE. A precision from 0 to 23 results in a + * 4-byte single-precision FLOAT column. A precision from 24 to 53 results in an 8-byte + * double-precision DOUBLE column. As of MySQL 8.0.17, the nonstandard FLOAT(M,D) and + * DOUBLE(M,D) syntax is deprecated, and should expect support for it be removed in a future + * version of MySQL. Based on this future, we didn't handle the case. + */ + @Override + protected Object convertFloat(Column column, Field fieldDefn, Object data) { + if (!column.scale().isPresent() && column.length() <= 24) { + return super.convertReal(column, fieldDefn, data); + } else { + return super.convertFloat(column, fieldDefn, data); + } + } + + /** + * Convert the {@link String} or {@code byte[]} value to a string value used in a {@link + * SourceRecord}. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never + * null + * @param columnCharset the Java character set in which column byte[] values are encoded; may + * not be null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertString( + Column column, Field fieldDefn, Charset columnCharset, Object data) { + return convertValue( + column, + fieldDefn, + data, + "", + (r) -> { + if (data instanceof byte[]) { + // Decode the binary representation using the given character encoding ... + r.deliver(new String((byte[]) data, columnCharset)); + } else if (data instanceof String) { + r.deliver(data); + } + }); + } + + /** + * Converts a value object for a MySQL {@code YEAR}, which appear in the binlog as an integer + * though returns from the MySQL JDBC driver as either a short or a {@link java.sql.Date}. + * + * @param column the column definition describing the {@code data} value; never null + * @param fieldDefn the field definition; never null + * @param data the data object to be converted into a year literal integer value; never null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + @SuppressWarnings("deprecation") + protected Object convertYearToInt(Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + 0, + (r) -> { + Object mutData = data; + if (data instanceof java.time.Year) { + // The MySQL binlog always returns a Year object ... + r.deliver( + adjustTemporal( + java.time.Year.of( + ((java.time.Year) data).getValue())) + .get(ChronoField.YEAR)); + } else if (data instanceof java.sql.Date) { + // MySQL JDBC driver sometimes returns a Java SQL Date object ... + // year from java.sql.Date is defined as number of years since 1900 + r.deliver(((java.sql.Date) data).getYear() + 1900); + } else if (data instanceof String) { + mutData = Integer.valueOf((String) data); + } + if (mutData instanceof Number) { + // MySQL JDBC driver sometimes returns a short ... + r.deliver( + adjustTemporal(java.time.Year.of(((Number) mutData).intValue())) + .get(ChronoField.YEAR)); + } + }); + } + + /** + * Converts a value object for a MySQL {@code ENUM}, which is represented in the binlog events + * as an integer value containing the index of the enum option. The MySQL JDBC driver returns a + * string containing the option, so this method calculates the same. + * + * @param options the characters that appear in the same order as defined in the column; may not + * be null + * @param column the column definition describing the {@code data} value; never null + * @param fieldDefn the field definition; never null + * @param data the data object to be converted into an {@code ENUM} literal String value + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertEnumToString( + List options, Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + "", + (r) -> { + if (data instanceof String) { + // JDBC should return strings ... + r.deliver(data); + } else if (data instanceof Integer) { + if (options != null) { + // The binlog will contain an int with the 1-based index of the option + // in the enum value ... + int value = ((Integer) data).intValue(); + if (value == 0) { + // an invalid value was specified, which corresponds to the empty + // string '' and an index of 0 + r.deliver(""); + } + int index = value - 1; // 'options' is 0-based + if (index < options.size() && index >= 0) { + r.deliver(options.get(index)); + } + } else { + r.deliver(null); + } + } + }); + } + + /** + * Converts a value object for a MySQL {@code SET}, which is represented in the binlog events + * contain a long number in which every bit corresponds to a different option. The MySQL JDBC + * driver returns a string containing the comma-separated options, so this method calculates the + * same. + * + * @param options the characters that appear in the same order as defined in the column; may not + * be null + * @param column the column definition describing the {@code data} value; never null + * @param fieldDefn the field definition; never null + * @param data the data object to be converted into an {@code SET} literal String value; never + * null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertSetToString( + List options, Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + "", + (r) -> { + if (data instanceof String) { + // JDBC should return strings ... + r.deliver(data); + } else if (data instanceof Long) { + // The binlog will contain a long with the indexes of the options in the set + // value ... + long indexes = ((Long) data).longValue(); + r.deliver(convertSetValue(column, indexes, options)); + } + }); + } + + /** + * Determine if the uppercase form of a column's type exactly matches or begins with the + * specified prefix. Note that this logic works when the column's {@link Column#typeName() type} + * contains the type name followed by parentheses. + * + * @param upperCaseTypeName the upper case form of the column's {@link Column#typeName() type + * name} + * @param upperCaseMatch the upper case form of the expected type or prefix of the type; may not + * be null + * @return {@code true} if the type matches the specified type, or {@code false} otherwise + */ + protected boolean matches(String upperCaseTypeName, String upperCaseMatch) { + if (upperCaseTypeName == null) { + return false; + } + return upperCaseMatch.equals(upperCaseTypeName) + || upperCaseTypeName.startsWith(upperCaseMatch + "("); + } + + /** + * Determine if the uppercase form of a column's type is geometry collection independent of JDBC + * driver or server version. + * + * @param upperCaseTypeName the upper case form of the column's {@link Column#typeName() type + * name} + * @return {@code true} if the type is geometry collection + */ + protected boolean isGeometryCollection(String upperCaseTypeName) { + if (upperCaseTypeName == null) { + return false; + } + + return upperCaseTypeName.equals("GEOMETRYCOLLECTION") + || upperCaseTypeName.equals("GEOMCOLLECTION") + || upperCaseTypeName.endsWith(".GEOMCOLLECTION"); + } + + protected List extractEnumAndSetOptions(Column column) { + return MySqlAntlrDdlParser.extractEnumAndSetOptions(column.enumValues()); + } + + protected String extractEnumAndSetOptionsAsString(Column column) { + return Strings.join(",", extractEnumAndSetOptions(column)); + } + + protected String convertSetValue(Column column, long indexes, List options) { + StringBuilder sb = new StringBuilder(); + int index = 0; + boolean first = true; + int optionLen = options.size(); + while (indexes != 0L) { + if (indexes % 2L != 0) { + if (first) { + first = false; + } else { + sb.append(','); + } + if (index < optionLen) { + sb.append(options.get(index)); + } else { + logger.warn("Found unexpected index '{}' on column {}", index, column); + } + } + ++index; + indexes = indexes >>> 1; + } + return sb.toString(); + } + + /** + * Convert the a value representing a POINT {@code byte[]} value to a Point value used in a + * {@link SourceRecord}. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never + * null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertPoint(Column column, Field fieldDefn, Object data) { + final MySqlGeometry empty = MySqlGeometry.createEmpty(); + return convertValue( + column, + fieldDefn, + data, + io.debezium.data.geometry.Geometry.createValue( + fieldDefn.schema(), empty.getWkb(), empty.getSrid()), + (r) -> { + if (data instanceof byte[]) { + // The binlog utility sends a byte array for any Geometry type, we will use + // our own binaryParse to parse the byte to WKB, hence + // to the suitable class + MySqlGeometry mySqlGeometry = MySqlGeometry.fromBytes((byte[]) data); + if (mySqlGeometry.isPoint()) { + r.deliver( + io.debezium.data.geometry.Point.createValue( + fieldDefn.schema(), + mySqlGeometry.getWkb(), + mySqlGeometry.getSrid())); + } else { + throw new ConnectException( + "Failed to parse and read a value of type POINT on " + column); + } + } + }); + } + + /** + * Convert the a value representing a GEOMETRY {@code byte[]} value to a Geometry value used in + * a {@link SourceRecord}. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never + * null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertGeometry(Column column, Field fieldDefn, Object data) { + final MySqlGeometry empty = MySqlGeometry.createEmpty(); + return convertValue( + column, + fieldDefn, + data, + io.debezium.data.geometry.Geometry.createValue( + fieldDefn.schema(), empty.getWkb(), empty.getSrid()), + (r) -> { + if (data instanceof byte[]) { + // The binlog utility sends a byte array for any Geometry type, we will use + // our own binaryParse to parse the byte to WKB, hence + // to the suitable class + if (data instanceof byte[]) { + // The binlog utility sends a byte array for any Geometry type, we will + // use our own binaryParse to parse the byte to WKB, hence + // to the suitable class + MySqlGeometry mySqlGeometry = MySqlGeometry.fromBytes((byte[]) data); + r.deliver( + io.debezium.data.geometry.Geometry.createValue( + fieldDefn.schema(), + mySqlGeometry.getWkb(), + mySqlGeometry.getSrid())); + } + } + }); + } + + @Override + protected byte[] normalizeBinaryData(Column column, byte[] data) { + // DBZ-254 right-pad fixed-length binary column values with 0x00 (zero byte) + if (column.jdbcType() == Types.BINARY && data.length < column.length()) { + data = Arrays.copyOf(data, column.length()); + } + + return super.normalizeBinaryData(column, data); + } + + /** + * Convert the a value representing a Unsigned TINYINT value to the correct Unsigned TINYINT + * representation. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never + * null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertUnsignedTinyint(Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + (short) 0, + (r) -> { + if (data instanceof Short) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedTinyint((short) data)); + } else if (data instanceof Number) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedTinyint( + ((Number) data).shortValue())); + } else { + // We continue with the original converting method (smallint) since we have + // an unsigned Tinyint + r.deliver(convertSmallInt(column, fieldDefn, data)); + } + }); + } + + /** + * Convert the a value representing a Unsigned SMALLINT value to the correct Unsigned SMALLINT + * representation. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never + * null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertUnsignedSmallint(Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + 0, + (r) -> { + if (data instanceof Integer) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedSmallint((int) data)); + } else if (data instanceof Number) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedSmallint( + ((Number) data).intValue())); + } else { + // We continue with the original converting method (integer) since we have + // an unsigned Smallint + r.deliver(convertInteger(column, fieldDefn, data)); + } + }); + } + + /** + * Convert the a value representing a Unsigned MEDIUMINT value to the correct Unsigned SMALLINT + * representation. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never + * null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertUnsignedMediumint(Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + 0, + (r) -> { + if (data instanceof Integer) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedMediumint((int) data)); + } else if (data instanceof Number) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedMediumint( + ((Number) data).intValue())); + } else { + // We continue with the original converting method (integer) since we have + // an unsigned Medium + r.deliver(convertInteger(column, fieldDefn, data)); + } + }); + } + + /** + * Convert the a value representing a Unsigned INT value to the correct Unsigned INT + * representation. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never + * null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertUnsignedInt(Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + 0L, + (r) -> { + if (data instanceof Long) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedInteger((long) data)); + } else if (data instanceof Number) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedInteger( + ((Number) data).longValue())); + } else { + // We continue with the original converting method (bigint) since we have an + // unsigned Integer + r.deliver(convertBigInt(column, fieldDefn, data)); + } + }); + } + + /** + * Convert the a value representing a Unsigned BIGINT value to the correct Unsigned INT + * representation. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never + * null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertUnsignedBigint(Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + 0L, + (r) -> { + if (data instanceof BigDecimal) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedBigint( + (BigDecimal) data)); + } else if (data instanceof Number) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedBigint( + new BigDecimal(((Number) data).toString()))); + } else if (data instanceof String) { + r.deliver( + MySqlUnsignedIntegerConverter.convertUnsignedBigint( + new BigDecimal((String) data))); + } else { + r.deliver(convertNumeric(column, fieldDefn, data)); + } + }); + } + + /** + * Converts a value object for an expected type of {@link java.time.Duration} to {@link Long} + * values that represents the time in microseconds. + * + *

Per the JDBC specification, databases should return {@link java.sql.Time} instances, but + * that's not working because it can only handle Daytime 00:00:00-23:59:59. We use {@link + * java.time.Duration} instead that can handle the range of -838:59:59.000000 to + * 838:59:59.000000 of a MySQL TIME type and transfer data as signed INT64 which reflects the DB + * value converted to microseconds. + * + * @param column the column definition describing the {@code data} value; never null + * @param fieldDefn the field definition; never null + * @param data the data object to be converted into a {@link java.time.Duration} type; never + * null + * @return the converted value, or null if the conversion could not be made and the column + * allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not + * allow nulls + */ + protected Object convertDurationToMicroseconds(Column column, Field fieldDefn, Object data) { + return convertValue( + column, + fieldDefn, + data, + 0L, + (r) -> { + try { + if (data instanceof Duration) { + r.deliver(((Duration) data).toNanos() / 1_000); + } + } catch (IllegalArgumentException e) { + } + }); + } + + protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn, Object data) { + if (data == null && !fieldDefn.schema().isOptional()) { + return null; + } + if (!(data instanceof Timestamp)) { + return data; + } + + return ((Timestamp) data).toLocalDateTime(); + } + + public static Duration stringToDuration(String timeString) { + Matcher matcher = TIME_FIELD_PATTERN.matcher(timeString); + if (!matcher.matches()) { + throw new RuntimeException("Unexpected format for TIME column: " + timeString); + } + + long hours = Long.parseLong(matcher.group(1)); + long minutes = Long.parseLong(matcher.group(2)); + long seconds = Long.parseLong(matcher.group(3)); + long nanoSeconds = 0; + String microSecondsString = matcher.group(5); + if (microSecondsString != null) { + nanoSeconds = Long.parseLong(Strings.justifyLeft(microSecondsString, 9, '0')); + } + + if (hours >= 0) { + return Duration.ofHours(hours) + .plusMinutes(minutes) + .plusSeconds(seconds) + .plusNanos(nanoSeconds); + } else { + return Duration.ofHours(hours) + .minusMinutes(minutes) + .minusSeconds(seconds) + .minusNanos(nanoSeconds); + } + } + + public static LocalDate stringToLocalDate(String dateString, Column column, Table table) { + final Matcher matcher = DATE_FIELD_PATTERN.matcher(dateString); + if (!matcher.matches()) { + throw new RuntimeException("Unexpected format for DATE column: " + dateString); + } + + final int year = Integer.parseInt(matcher.group(1)); + final int month = Integer.parseInt(matcher.group(2)); + final int day = Integer.parseInt(matcher.group(3)); + + if (year == 0 || month == 0 || day == 0) { + LOGGER.warn( + "Invalid value '{}' stored in column '{}' of table '{}' converted to empty value", + dateString, + column.name(), + table.id()); + return null; + } + return LocalDate.of(year, month, day); + } + + public static boolean containsZeroValuesInDatePart( + String timestampString, Column column, Table table) { + final Matcher matcher = TIMESTAMP_FIELD_PATTERN.matcher(timestampString); + if (!matcher.matches()) { + throw new RuntimeException("Unexpected format for DATE column: " + timestampString); + } + + final int year = Integer.parseInt(matcher.group(1)); + final int month = Integer.parseInt(matcher.group(2)); + final int day = Integer.parseInt(matcher.group(3)); + + if (year == 0 || month == 0 || day == 0) { + LOGGER.warn( + "Invalid value '{}' stored in column '{}' of table '{}' converted to empty value", + timestampString, + column.name(), + table.id()); + return true; + } + return false; + } + + public static void defaultParsingErrorHandler(String message, Exception exception) { + throw new DebeziumException(message, exception); + } +}