[cdc-pipeline-connector][mysql] Support more mysql types

pull/2810/head
Jiabao Sun 1 year ago committed by Hang Ruan
parent ec3d558485
commit 174282d3ef

@ -16,13 +16,13 @@
package com.ververica.cdc.common.event;
import com.ververica.cdc.common.annotation.Public;
import com.ververica.cdc.common.annotation.PublicEvolving;
import java.io.Serializable;
import java.util.List;
/** Deserializer to deserialize given record to {@link Event}. */
@Public
@PublicEvolving
public interface EventDeserializer<T> extends Serializable {
/** Deserialize given record to {@link Event}s. */

@ -85,7 +85,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
@Override
public DataSource createDataSource(Context context) {
final Configuration config = context.getConfiguration();
final Configuration config = context.getFactoryConfiguration();
String hostname = config.get(HOSTNAME);
int port = config.get(PORT);
@ -103,21 +103,17 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
double distributionFactorUpper =
config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower =
config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
double distributionFactorUpper = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean closeIdleReaders =
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
validateIntegerOption(
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
@ -154,7 +150,12 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
.jdbcProperties(getJdbcProperties(configMap));
Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
configFactory.tableList(getTableList(configFactory.createConfig(0), selectors));
String[] capturedTables = getTableList(configFactory.createConfig(0), selectors);
if (capturedTables.length == 0) {
throw new IllegalArgumentException(
"Cannot find any table by the option 'tables' = " + tables);
}
configFactory.tableList(capturedTables);
return new MySqlDataSource(configFactory);
}
@ -232,8 +233,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
return getSpecificOffset(config);
case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
return StartupOptions.timestamp(
config.get(SCAN_STARTUP_TIMESTAMP_MILLIS));
return StartupOptions.timestamp(config.get(SCAN_STARTUP_TIMESTAMP_MILLIS));
default:
throw new ValidationException(
@ -250,12 +250,9 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
}
private static void validateSpecificOffset(Configuration config) {
Optional<String> gtidSet =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
Optional<String> binlogFilename =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
Optional<Long> binlogPosition =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
Optional<String> gtidSet = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
Optional<String> binlogFilename = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
Optional<Long> binlogPosition = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
if (!gtidSet.isPresent() && !(binlogFilename.isPresent() && binlogPosition.isPresent())) {
throw new ValidationException(
String.format(
@ -274,10 +271,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
.ifPresent(offsetBuilder::setGtidSet);
// Binlog file + pos
Optional<String> binlogFilename =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
Optional<Long> binlogPosition =
config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
Optional<String> binlogFilename = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
Optional<Long> binlogPosition = config.getOptional(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
if (binlogFilename.isPresent() && binlogPosition.isPresent()) {
offsetBuilder.setBinlogFilePosition(binlogFilename.get(), binlogPosition.get());
} else {

@ -17,6 +17,7 @@
package com.ververica.cdc.connectors.mysql.source;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.source.DataSource;
import com.ververica.cdc.common.source.EventSourceProvider;
@ -64,4 +65,9 @@ public class MySqlDataSource implements DataSource {
public MetadataAccessor getMetadataAccessor() {
return new MySqlMetadataAccessor(sourceConfig);
}
@VisibleForTesting
public MySqlSourceConfig getSourceConfig() {
return sourceConfig;
}
}

@ -228,5 +228,6 @@ public class MySqlDataSourceOptions {
ConfigOptions.key("schema-change.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether , by default is false.");
.withDescription(
"Whether send schema change events, by default is false. If set to false, the schema changes will not be sent.");
}

@ -16,13 +16,19 @@
package com.ververica.cdc.connectors.mysql.source;
import com.esri.core.geometry.ogc.OGCGeometry;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.connectors.mysql.source.parser.CustomMySqlAntlrDdlParser;
import com.ververica.cdc.debezium.event.DebeziumEventDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import io.debezium.data.Envelope;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecord;
import org.apache.kafka.connect.data.Schema;
@ -30,10 +36,13 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord;
@ -46,6 +55,8 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.mysql.SchemaChangeKey";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final boolean includeSchemaChanges;
private transient Tables tables;
@ -55,7 +66,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
DebeziumChangelogMode changelogMode,
ZoneId serverTimeZone,
boolean includeSchemaChanges) {
super(changelogMode, serverTimeZone);
super(new MySqlSchemaDataTypeInference(), changelogMode, serverTimeZone);
this.includeSchemaChanges = includeSchemaChanges;
}
@ -102,16 +113,44 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
@Override
protected TableId getTableId(SourceRecord record) {
if (isDataChangeRecord(record)) {
String[] parts = record.topic().split("\\.");
return TableId.tableId(parts[1], parts[2]);
}
// TODO: get table id from schema change record
return null;
String[] parts = record.topic().split("\\.");
return TableId.tableId(parts[1], parts[2]);
}
@Override
protected Map<String, String> getMetadata(SourceRecord record) {
return Collections.emptyMap();
}
@Override
protected Object convertToString(Object dbzObj, Schema schema) {
// the Geometry datatype in MySQL will be converted to
// a String with Json format
if (Point.LOGICAL_NAME.equals(schema.name())
|| Geometry.LOGICAL_NAME.equals(schema.name())) {
try {
Struct geometryStruct = (Struct) dbzObj;
byte[] wkb = geometryStruct.getBytes("wkb");
String geoJson = OGCGeometry.fromBinary(ByteBuffer.wrap(wkb)).asGeoJson();
JsonNode originGeoNode = OBJECT_MAPPER.readTree(geoJson);
Optional<Integer> srid = Optional.ofNullable(geometryStruct.getInt32("srid"));
Map<String, Object> geometryInfo = new HashMap<>();
String geometryType = originGeoNode.get("type").asText();
geometryInfo.put("type", geometryType);
if (geometryType.equals("GeometryCollection")) {
geometryInfo.put("geometries", originGeoNode.get("geometries"));
} else {
geometryInfo.put("coordinates", originGeoNode.get("coordinates"));
}
geometryInfo.put("srid", srid.orElse(0));
return BinaryStringData.fromString(
OBJECT_MAPPER.writer().writeValueAsString(geometryInfo));
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Failed to convert %s to geometry JSON.", dbzObj), e);
}
} else {
return BinaryStringData.fromString(dbzObj.toString());
}
}
}

@ -0,0 +1,43 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.debezium.event.DebeziumSchemaDataTypeInference;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import org.apache.kafka.connect.data.Schema;
/** {@link DataType} inference for MySQL debezium {@link Schema}. */
@Internal
public class MySqlSchemaDataTypeInference extends DebeziumSchemaDataTypeInference {
private static final long serialVersionUID = 1L;
protected DataType inferStruct(Object value, Schema schema) {
// the Geometry datatype in MySQL will be converted to
// a String with Json format
if (Point.LOGICAL_NAME.equals(schema.name())
|| Geometry.LOGICAL_NAME.equals(schema.name())) {
return DataTypes.STRING();
} else {
return super.inferStruct(value, schema);
}
}
}

@ -23,13 +23,13 @@ import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.connectors.mysql.schema.MySqlCdcCommonTypeUtils;
import com.ververica.cdc.connectors.mysql.schema.MySqlFieldDefinition;
import com.ververica.cdc.connectors.mysql.schema.MySqlTableDefinition;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import com.ververica.cdc.connectors.mysql.table.StartupMode;
import com.ververica.cdc.connectors.mysql.utils.MySqlTypeUtils;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.jdbc.JdbcConnection;
@ -200,7 +200,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
Column column = columns.get(i);
String colName = column.name();
DataType dataType = MySqlCdcCommonTypeUtils.fromDbzColumn(column);
DataType dataType = MySqlTypeUtils.fromDbzColumn(column);
if (!column.isOptional()) {
dataType = dataType.notNull();
}

@ -19,7 +19,6 @@ package com.ververica.cdc.connectors.mysql.utils;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Column;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.connectors.mysql.schema.MySqlCdcCommonTypeUtils;
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import io.debezium.connector.mysql.MySqlConnection;
@ -147,7 +146,7 @@ public class MySqlSchemaUtils {
public static Column toColumn(io.debezium.relational.Column column) {
return Column.physicalColumn(
column.name(), MySqlCdcCommonTypeUtils.fromDbzColumn(column), column.comment());
column.name(), MySqlTypeUtils.fromDbzColumn(column), column.comment());
}
public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {

@ -14,18 +14,24 @@
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.schema;
package com.ververica.cdc.connectors.mysql.utils;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import io.debezium.relational.Column;
/** Utilities for converting from MySQL types to {@link DataType}s. */
public class MySqlCdcCommonTypeUtils {
public class MySqlTypeUtils {
// ------ MySQL Type ------
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
private static final String BIT = "BIT";
/*
* BOOLEAN type will be returned when handling the change event from SQL like:
* ALTER TABLE `student` CHANGE COLUMN `is_male` `is_female` BOOLEAN NULL;
*/
private static final String BOOLEAN = "BOOLEAN";
private static final String BOOL = "BOOL";
private static final String TINYINT = "TINYINT";
private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL";
@ -38,6 +44,9 @@ public class MySqlCdcCommonTypeUtils {
private static final String INT = "INT";
private static final String INT_UNSIGNED = "INT UNSIGNED";
private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL";
private static final String INTEGER = "INTEGER";
private static final String INTEGER_UNSIGNED = "INTEGER UNSIGNED";
private static final String INTEGER_UNSIGNED_ZEROFILL = "INTEGER UNSIGNED ZEROFILL";
private static final String BIGINT = "BIGINT";
private static final String SERIAL = "SERIAL";
private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
@ -85,6 +94,14 @@ public class MySqlCdcCommonTypeUtils {
private static final String SET = "SET";
private static final String ENUM = "ENUM";
private static final String GEOMETRY = "GEOMETRY";
private static final String POINT = "POINT";
private static final String LINESTRING = "LINESTRING";
private static final String POLYGON = "POLYGON";
private static final String GEOMCOLLECTION = "GEOMCOLLECTION";
private static final String GEOMETRYCOLLECTION = "GEOMETRYCOLLECTION";
private static final String MULTIPOINT = "MULTIPOINT";
private static final String MULTIPOLYGON = "MULTIPOLYGON";
private static final String MULTILINESTRING = "MULTILINESTRING";
private static final String UNKNOWN = "UNKNOWN";
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
@ -104,6 +121,13 @@ public class MySqlCdcCommonTypeUtils {
private static DataType convertFromColumn(Column column) {
String typeName = column.typeName();
switch (typeName) {
case BIT:
return column.length() == 1
? DataTypes.BOOLEAN()
: DataTypes.BINARY(column.length() / 8);
case BOOL:
case BOOLEAN:
return DataTypes.BOOLEAN();
case TINYINT:
// MySQL haven't boolean type, it uses tinyint(1) to represents boolean type
// user should not use tinyint(1) to store number although jdbc url parameter
@ -117,10 +141,14 @@ public class MySqlCdcCommonTypeUtils {
case SMALLINT_UNSIGNED:
case SMALLINT_UNSIGNED_ZEROFILL:
case INT:
case INTEGER:
case MEDIUMINT:
case YEAR:
return DataTypes.INT();
case INT_UNSIGNED:
case INT_UNSIGNED_ZEROFILL:
case INTEGER_UNSIGNED:
case INTEGER_UNSIGNED_ZEROFILL:
case MEDIUMINT_UNSIGNED:
case MEDIUMINT_UNSIGNED_ZEROFILL:
case BIGINT:
@ -160,27 +188,49 @@ public class MySqlCdcCommonTypeUtils {
case DATE:
return DataTypes.DATE();
case DATETIME:
return column.length() >= 0
? DataTypes.TIMESTAMP(column.length())
: DataTypes.TIMESTAMP(0);
case TIMESTAMP:
return column.length() >= 0
? DataTypes.TIMESTAMP_LTZ(column.length())
: DataTypes.TIMESTAMP_LTZ();
: DataTypes.TIMESTAMP_LTZ(0);
case CHAR:
return DataTypes.CHAR(column.length());
case VARCHAR:
return DataTypes.VARCHAR(column.length());
case TINYTEXT:
case TEXT:
case MEDIUMTEXT:
case LONGTEXT:
case JSON:
case ENUM:
case GEOMETRY:
case POINT:
case LINESTRING:
case POLYGON:
case GEOMETRYCOLLECTION:
case GEOMCOLLECTION:
case MULTIPOINT:
case MULTIPOLYGON:
case MULTILINESTRING:
return DataTypes.STRING();
case BINARY:
return DataTypes.BINARY(column.length());
case VARBINARY:
return DataTypes.VARBINARY(column.length());
case TINYBLOB:
case BLOB:
case MEDIUMBLOB:
case LONGBLOB:
return DataTypes.BYTES();
case SET:
return DataTypes.ARRAY(DataTypes.STRING());
default:
throw new UnsupportedOperationException(
String.format("Don't support MySQL type '%s' yet.", typeName));
}
}
private MySqlCdcCommonTypeUtils() {}
private MySqlTypeUtils() {}
}

@ -1,67 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium.event;
import com.ververica.cdc.common.types.DataField;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import org.apache.kafka.connect.data.Schema;
/** Utility class to convert {@link Schema} to {@link DataType}. */
public class ConnectSchemaTypeInference {
public static DataType infer(Schema schema) {
return schema.isOptional()
? infer(schema, schema.type())
: infer(schema, schema.type()).notNull();
}
private static DataType infer(Schema schema, Schema.Type type) {
switch (type) {
case INT8:
return DataTypes.TINYINT();
case INT16:
return DataTypes.SMALLINT();
case INT32:
return DataTypes.INT();
case INT64:
return DataTypes.BIGINT();
case FLOAT32:
return DataTypes.FLOAT();
case FLOAT64:
return DataTypes.DOUBLE();
case BOOLEAN:
return DataTypes.BOOLEAN();
case STRING:
return DataTypes.STRING();
case BYTES:
return DataTypes.BYTES();
case ARRAY:
return DataTypes.ARRAY(infer(schema.valueSchema()));
case MAP:
return DataTypes.MAP(infer(schema.keySchema()), infer(schema.valueSchema()));
case STRUCT:
return DataTypes.ROW(
schema.fields().stream()
.map(f -> DataTypes.FIELD(f.name(), infer(f.schema())))
.toArray(DataField[]::new));
default:
throw new UnsupportedOperationException(
"Unsupported type: " + schema.type().getName());
}
}
}

@ -76,6 +76,9 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
private static final Map<DataType, DeserializationRuntimeConverter> CONVERTERS =
new ConcurrentHashMap<>();
/** The schema data type inference. */
protected final SchemaDataTypeInference schemaDataTypeInference;
/** Changelog Mode to use for encoding changes in Flink internal data structure. */
protected final DebeziumChangelogMode changelogMode;
@ -83,7 +86,10 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
protected final ZoneId serverTimeZone;
public DebeziumEventDeserializationSchema(
DebeziumChangelogMode changelogMode, ZoneId serverTimeZone) {
SchemaDataTypeInference schemaDataTypeInference,
DebeziumChangelogMode changelogMode,
ZoneId serverTimeZone) {
this.schemaDataTypeInference = schemaDataTypeInference;
this.changelogMode = changelogMode;
this.serverTimeZone = serverTimeZone;
}
@ -141,14 +147,12 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
}
private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception {
DataType dataType = ConnectSchemaTypeInference.infer(valueSchema);
return (RecordData)
getOrCreateConverter(dataType, serverTimeZone).convert(value, valueSchema);
DataType dataType = schemaDataTypeInference.infer(value, valueSchema);
return (RecordData) getOrCreateConverter(dataType).convert(value, valueSchema);
}
private static DeserializationRuntimeConverter getOrCreateConverter(
DataType type, ZoneId serverTimeZone) {
return CONVERTERS.computeIfAbsent(type, t -> createConverter(t, serverTimeZone));
private DeserializationRuntimeConverter getOrCreateConverter(DataType type) {
return CONVERTERS.computeIfAbsent(type, this::createConverter);
}
// -------------------------------------------------------------------------------------
@ -156,9 +160,8 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
// -------------------------------------------------------------------------------------
/** Creates a runtime converter which is null safe. */
private static DeserializationRuntimeConverter createConverter(
DataType type, ZoneId serverTimeZone) {
return wrapIntoNullableConverter(createNotNullConverter(type, serverTimeZone));
private DeserializationRuntimeConverter createConverter(DataType type) {
return wrapIntoNullableConverter(createNotNullConverter(type));
}
// --------------------------------------------------------------------------------
@ -168,58 +171,57 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
// --------------------------------------------------------------------------------
/** Creates a runtime converter which assuming input object is not null. */
public static DeserializationRuntimeConverter createNotNullConverter(
DataType type, ZoneId serverTimeZone) {
protected DeserializationRuntimeConverter createNotNullConverter(DataType type) {
// if no matched user defined converter, fallback to the default converter
switch (type.getTypeRoot()) {
case BOOLEAN:
return convertToBoolean();
return this::convertToBoolean;
case TINYINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Byte.parseByte(dbzObj.toString());
}
};
return this::convertToByte;
case SMALLINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Short.parseShort(dbzObj.toString());
}
};
return this::convertToShort;
case INTEGER:
return convertToInt();
return this::convertToInt;
case BIGINT:
return convertToLong();
return this::convertToLong;
case DATE:
return convertToDate();
return this::convertToDate;
case TIME_WITHOUT_TIME_ZONE:
return convertToTime();
return this::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return convertToTimestamp(serverTimeZone);
return this::convertToTimestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return convertToLocalTimeZoneTimestamp(serverTimeZone);
return this::convertToLocalTimeZoneTimestamp;
case FLOAT:
return convertToFloat();
return this::convertToFloat;
case DOUBLE:
return convertToDouble();
return this::convertToDouble;
case CHAR:
case VARCHAR:
return convertToString();
return this::convertToString;
case BINARY:
case VARBINARY:
return convertToBinary();
return this::convertToBinary;
case DECIMAL:
return createDecimalConverter((DecimalType) type);
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return convertToDecimal((DecimalType) type, dbzObj, schema);
}
};
case ROW:
return createRowConverter((RowType) type, serverTimeZone);
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
return convertToRecord((RowType) type, dbzObj, schema);
}
};
case ARRAY:
case MAP:
default:
@ -227,288 +229,186 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
}
}
private static DeserializationRuntimeConverter convertToBoolean() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Boolean) {
return dbzObj;
} else if (dbzObj instanceof Byte) {
return (byte) dbzObj == 1;
} else if (dbzObj instanceof Short) {
return (short) dbzObj == 1;
} else {
return Boolean.parseBoolean(dbzObj.toString());
}
}
};
protected Object convertToBoolean(Object dbzObj, Schema schema) {
if (dbzObj instanceof Boolean) {
return dbzObj;
} else if (dbzObj instanceof Byte) {
return (byte) dbzObj == 1;
} else if (dbzObj instanceof Short) {
return (short) dbzObj == 1;
} else {
return Boolean.parseBoolean(dbzObj.toString());
}
}
private static DeserializationRuntimeConverter convertToInt() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return dbzObj;
} else if (dbzObj instanceof Long) {
return ((Long) dbzObj).intValue();
} else {
return Integer.parseInt(dbzObj.toString());
}
}
};
protected Object convertToByte(Object dbzObj, Schema schema) {
return Byte.parseByte(dbzObj.toString());
}
private static DeserializationRuntimeConverter convertToLong() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return ((Integer) dbzObj).longValue();
} else if (dbzObj instanceof Long) {
return dbzObj;
} else {
return Long.parseLong(dbzObj.toString());
}
}
};
protected Object convertToShort(Object dbzObj, Schema schema) {
return Short.parseShort(dbzObj.toString());
}
private static DeserializationRuntimeConverter convertToDouble() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return ((Float) dbzObj).doubleValue();
} else if (dbzObj instanceof Double) {
return dbzObj;
} else {
return Double.parseDouble(dbzObj.toString());
}
}
};
protected Object convertToInt(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return dbzObj;
} else if (dbzObj instanceof Long) {
return ((Long) dbzObj).intValue();
} else {
return Integer.parseInt(dbzObj.toString());
}
}
private static DeserializationRuntimeConverter convertToFloat() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return dbzObj;
} else if (dbzObj instanceof Double) {
return ((Double) dbzObj).floatValue();
} else {
return Float.parseFloat(dbzObj.toString());
}
}
};
protected Object convertToLong(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return ((Integer) dbzObj).longValue();
} else if (dbzObj instanceof Long) {
return dbzObj;
} else {
return Long.parseLong(dbzObj.toString());
}
}
private static DeserializationRuntimeConverter convertToDate() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
}
};
protected Object convertToDouble(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return ((Float) dbzObj).doubleValue();
} else if (dbzObj instanceof Double) {
return dbzObj;
} else {
return Double.parseDouble(dbzObj.toString());
}
}
private static DeserializationRuntimeConverter convertToTime() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case MicroTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000);
case NanoTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000_000);
}
} else if (dbzObj instanceof Integer) {
return dbzObj;
}
// get number of milliseconds of the day
return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
}
};
protected Object convertToFloat(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return dbzObj;
} else if (dbzObj instanceof Double) {
return ((Double) dbzObj).floatValue();
} else {
return Float.parseFloat(dbzObj.toString());
}
}
private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromMillis(
micro / 1000, (int) (micro % 1000 * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromMillis(
nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime =
TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
}
};
protected Object convertToDate(Object dbzObj, Schema schema) {
return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
}
private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
// TIMESTAMP_LTZ type is encoded in string type
Instant instant = Instant.parse(str);
return TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(instant, serverTimeZone));
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
protected Object convertToTime(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case MicroTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000);
case NanoTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000_000);
}
};
} else if (dbzObj instanceof Integer) {
return dbzObj;
}
// get number of milliseconds of the day
return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
}
private static DeserializationRuntimeConverter convertToString() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return BinaryStringData.fromString(dbzObj.toString());
protected Object convertToTimestamp(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromMillis(micro / 1000, (int) (micro % 1000 * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000));
}
};
}
LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
}
private static DeserializationRuntimeConverter convertToBinary() {
return new DeserializationRuntimeConverter() {
protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
// TIMESTAMP_LTZ type is encoded in string type
Instant instant = Instant.parse(str);
return TimestampData.fromMillis(instant.toEpochMilli(), instant.getNano());
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}
private static final long serialVersionUID = 1L;
protected Object convertToString(Object dbzObj, Schema schema) {
return BinaryStringData.fromString(dbzObj.toString());
}
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof byte[]) {
return dbzObj;
} else if (dbzObj instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else {
throw new UnsupportedOperationException(
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
}
}
};
protected Object convertToBinary(Object dbzObj, Schema schema) {
if (dbzObj instanceof byte[]) {
return dbzObj;
} else if (dbzObj instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else {
throw new UnsupportedOperationException(
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
}
}
private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
protected Object convertToDecimal(DecimalType decimalType, Object dbzObj, Schema schema) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
BigDecimal bigDecimal;
if (dbzObj instanceof byte[]) {
// decimal.handling.mode=precise
bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
} else if (dbzObj instanceof String) {
// decimal.handling.mode=string
bigDecimal = new BigDecimal((String) dbzObj);
} else if (dbzObj instanceof Double) {
// decimal.handling.mode=double
bigDecimal = BigDecimal.valueOf((Double) dbzObj);
} else {
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
SpecialValueDecimal decimal =
VariableScaleDecimal.toLogical((Struct) dbzObj);
bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
} else {
// fallback to string
bigDecimal = new BigDecimal(dbzObj.toString());
}
}
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
BigDecimal bigDecimal;
if (dbzObj instanceof byte[]) {
// decimal.handling.mode=precise
bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
} else if (dbzObj instanceof String) {
// decimal.handling.mode=string
bigDecimal = new BigDecimal((String) dbzObj);
} else if (dbzObj instanceof Double) {
// decimal.handling.mode=double
bigDecimal = BigDecimal.valueOf((Double) dbzObj);
} else {
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
SpecialValueDecimal decimal = VariableScaleDecimal.toLogical((Struct) dbzObj);
bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
} else {
// fallback to string
bigDecimal = new BigDecimal(dbzObj.toString());
}
};
}
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
private static DeserializationRuntimeConverter createRowConverter(
RowType rowType, ZoneId serverTimeZone) {
final DeserializationRuntimeConverter[] fieldConverters =
protected Object convertToRecord(RowType rowType, Object dbzObj, Schema schema)
throws Exception {
DeserializationRuntimeConverter[] fieldConverters =
rowType.getFields().stream()
.map(DataField::getType)
.map(logicType -> createConverter(logicType, serverTimeZone))
.map(this::createConverter)
.toArray(DeserializationRuntimeConverter[]::new);
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
final BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
Struct struct = (Struct) dbzObj;
int arity = fieldNames.length;
Object[] fields = new Object[arity];
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
Field field = schema.field(fieldName);
if (field == null) {
fields[i] = null;
} else {
Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
Object convertedField =
convertField(fieldConverters[i], fieldValue, fieldSchema);
fields[i] = convertedField;
}
}
return generator.generate(fields);
String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
Struct struct = (Struct) dbzObj;
int arity = fieldNames.length;
Object[] fields = new Object[arity];
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
Field field = schema.field(fieldName);
if (field == null) {
fields[i] = null;
} else {
Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema);
fields[i] = convertedField;
}
};
}
return generator.generate(fields);
}
private static Object convertField(

@ -0,0 +1,198 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium.event;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.types.DataField;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Time;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Optional;
import static com.ververica.cdc.common.types.DecimalType.DEFAULT_PRECISION;
/** {@link DataType} inference for debezium {@link Schema}. */
@Internal
public class DebeziumSchemaDataTypeInference implements SchemaDataTypeInference, Serializable {
private static final long serialVersionUID = 1L;
@Override
public DataType infer(Object value, Schema schema) {
return schema.isOptional()
? infer(value, schema, schema.type())
: infer(value, schema, schema.type()).notNull();
}
protected DataType infer(Object value, Schema schema, Schema.Type type) {
switch (type) {
case INT8:
return inferInt8(value, schema);
case INT16:
return inferInt16(value, schema);
case INT32:
return inferInt32(value, schema);
case INT64:
return inferInt64(value, schema);
case FLOAT32:
return inferFloat32(value, schema);
case FLOAT64:
return inferFloat64(value, schema);
case BOOLEAN:
return inferBoolean(value, schema);
case STRING:
return inferString(value, schema);
case BYTES:
return inferBytes(value, schema);
case STRUCT:
return inferStruct(value, schema);
case ARRAY:
return inferArray(value, schema);
case MAP:
return inferMap(value, schema);
default:
throw new UnsupportedOperationException(
"Unsupported type: " + schema.type().getName());
}
}
protected DataType inferBoolean(Object value, Schema schema) {
return DataTypes.BOOLEAN();
}
protected DataType inferInt8(Object value, Schema schema) {
return DataTypes.TINYINT();
}
protected DataType inferInt16(Object value, Schema schema) {
return DataTypes.SMALLINT();
}
protected DataType inferInt32(Object value, Schema schema) {
if (Date.LOGICAL_NAME.equals(schema.name())) {
return DataTypes.DATE();
}
if (Time.SCHEMA_NAME.equals(schema.name())) {
return DataTypes.TIME(3);
}
return DataTypes.INT();
}
protected DataType inferInt64(Object value, Schema schema) {
if (MicroTime.SCHEMA_NAME.equals(schema.name())) {
return DataTypes.TIME(6);
}
if (NanoTime.SCHEMA_NAME.equals(schema.name())) {
return DataTypes.TIME(9);
}
if (Timestamp.SCHEMA_NAME.equals(schema.name())) {
return DataTypes.TIMESTAMP(3);
}
if (MicroTimestamp.SCHEMA_NAME.equals(schema.name())) {
return DataTypes.TIMESTAMP(6);
}
if (NanoTimestamp.SCHEMA_NAME.equals(schema.name())) {
return DataTypes.TIMESTAMP(9);
}
return DataTypes.BIGINT();
}
protected DataType inferFloat32(Object value, Schema schema) {
return DataTypes.FLOAT();
}
protected DataType inferFloat64(Object value, Schema schema) {
return DataTypes.DOUBLE();
}
protected DataType inferString(Object value, Schema schema) {
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
int nano =
Optional.ofNullable((String) value)
.map(Instant::parse)
.map(Instant::getNano)
.orElse(0);
int precision;
if (nano == 0) {
precision = 0;
} else if (nano % 1000 > 0) {
precision = 9;
} else if (nano % 1000_000 > 0) {
precision = 6;
} else if (nano % 1000_000_000 > 0) {
precision = 3;
} else {
precision = 0;
}
return DataTypes.TIMESTAMP(precision);
}
return DataTypes.STRING();
}
protected DataType inferBytes(Object value, Schema schema) {
if (Decimal.LOGICAL_NAME.equals(schema.name())
|| VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
if (value instanceof BigDecimal) {
BigDecimal decimal = (BigDecimal) value;
return DataTypes.DECIMAL(decimal.precision(), decimal.scale());
}
return DataTypes.DECIMAL(DEFAULT_PRECISION, 0);
}
return DataTypes.BYTES();
}
protected DataType inferStruct(Object value, Schema schema) {
Struct struct = (Struct) value;
if (Geometry.LOGICAL_NAME.equals(schema.name())
|| Point.LOGICAL_NAME.equals(schema.name())) {
return DataTypes.STRING();
}
return DataTypes.ROW(
schema.fields().stream()
.map(
f ->
DataTypes.FIELD(
f.name(), infer(struct.get(f.name()), f.schema())))
.toArray(DataField[]::new));
}
protected DataType inferArray(Object value, Schema schema) {
throw new UnsupportedOperationException("Unsupported type ARRAY");
}
protected DataType inferMap(Object value, Schema schema) {
throw new UnsupportedOperationException("Unsupported type MAP");
}
}

@ -0,0 +1,35 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium.event;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.types.DataType;
import org.apache.kafka.connect.data.Schema;
/** {@link DataType} inference for kafka connect {@link Schema}. */
@Internal
public interface SchemaDataTypeInference {
/**
* Infer {@link DataType} from {@link Schema}.
*
* @param value the value corresponding value to SCHEMA
* @param schema the kafka connect schema
* @return the inferred data type
*/
DataType infer(Object value, Schema schema);
}

@ -68,10 +68,10 @@ public abstract class SourceRecordEventDeserializer implements EventDeserializer
protected abstract List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record)
throws Exception;
/** Get {@link TableId} from given record. */
/** Get {@link TableId} from data change record. */
protected abstract TableId getTableId(SourceRecord record);
/** Get metadata from given record. */
/** Get metadata from data change record. */
protected abstract Map<String, String> getMetadata(SourceRecord record);
public static Schema fieldSchema(Schema schema, String fieldName) {

@ -27,6 +27,12 @@ public class MySqlTypeUtils {
// ------ MySQL Type ------
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
private static final String BIT = "BIT";
/*
* BOOLEAN type will be returned when handling the change event from SQL like:
* ALTER TABLE `student` CHANGE COLUMN `is_male` `is_female` BOOLEAN NULL;
*/
private static final String BOOLEAN = "BOOLEAN";
private static final String BOOL = "BOOL";
private static final String TINYINT = "TINYINT";
private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL";
@ -39,6 +45,9 @@ public class MySqlTypeUtils {
private static final String INT = "INT";
private static final String INT_UNSIGNED = "INT UNSIGNED";
private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL";
private static final String INTEGER = "INTEGER";
private static final String INTEGER_UNSIGNED = "INTEGER UNSIGNED";
private static final String INTEGER_UNSIGNED_ZEROFILL = "INTEGER UNSIGNED ZEROFILL";
private static final String BIGINT = "BIGINT";
private static final String SERIAL = "SERIAL";
private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
@ -86,6 +95,14 @@ public class MySqlTypeUtils {
private static final String SET = "SET";
private static final String ENUM = "ENUM";
private static final String GEOMETRY = "GEOMETRY";
private static final String POINT = "POINT";
private static final String LINESTRING = "LINESTRING";
private static final String POLYGON = "POLYGON";
private static final String GEOMCOLLECTION = "GEOMCOLLECTION";
private static final String GEOMETRYCOLLECTION = "GEOMETRYCOLLECTION";
private static final String MULTIPOINT = "MULTIPOINT";
private static final String MULTIPOLYGON = "MULTIPOLYGON";
private static final String MULTILINESTRING = "MULTILINESTRING";
private static final String UNKNOWN = "UNKNOWN";
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
@ -105,6 +122,13 @@ public class MySqlTypeUtils {
private static DataType convertFromColumn(Column column) {
String typeName = column.typeName();
switch (typeName) {
case BIT:
return column.length() == 1
? DataTypes.BOOLEAN()
: DataTypes.BINARY(column.length() / 8);
case BOOL:
case BOOLEAN:
return DataTypes.BOOLEAN();
case TINYINT:
// MySQL haven't boolean type, it uses tinyint(1) to represents boolean type
// user should not use tinyint(1) to store number although jdbc url parameter
@ -118,10 +142,14 @@ public class MySqlTypeUtils {
case SMALLINT_UNSIGNED:
case SMALLINT_UNSIGNED_ZEROFILL:
case INT:
case INTEGER:
case MEDIUMINT:
case YEAR:
return DataTypes.INT();
case INT_UNSIGNED:
case INT_UNSIGNED_ZEROFILL:
case INTEGER_UNSIGNED:
case INTEGER_UNSIGNED_ZEROFILL:
case MEDIUMINT_UNSIGNED:
case MEDIUMINT_UNSIGNED_ZEROFILL:
case BIGINT:
@ -161,25 +189,49 @@ public class MySqlTypeUtils {
case DATE:
return DataTypes.DATE();
case DATETIME:
case TIMESTAMP:
return column.length() >= 0
? DataTypes.TIMESTAMP(column.length())
: DataTypes.TIMESTAMP();
: DataTypes.TIMESTAMP(0);
case TIMESTAMP:
return column.length() >= 0
? DataTypes.TIMESTAMP_LTZ(column.length())
: DataTypes.TIMESTAMP_LTZ(0);
case CHAR:
return DataTypes.CHAR(column.length());
case VARCHAR:
return DataTypes.VARCHAR(column.length());
case TINYTEXT:
case TEXT:
case MEDIUMTEXT:
case LONGTEXT:
case JSON:
case ENUM:
case GEOMETRY:
case POINT:
case LINESTRING:
case POLYGON:
case GEOMETRYCOLLECTION:
case GEOMCOLLECTION:
case MULTIPOINT:
case MULTIPOLYGON:
case MULTILINESTRING:
return DataTypes.STRING();
case BINARY:
return DataTypes.BINARY(column.length());
case VARBINARY:
return DataTypes.VARBINARY(column.length());
case TINYBLOB:
case BLOB:
case MEDIUMBLOB:
case LONGBLOB:
return DataTypes.BYTES();
case SET:
return DataTypes.ARRAY(DataTypes.STRING());
default:
throw new UnsupportedOperationException(
String.format("Don't support MySQL type '%s' yet.", typeName));
}
}
private MySqlTypeUtils() {}
}

@ -16,7 +16,6 @@
package com.ververica.cdc.connectors.mysql.source;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
@ -48,11 +47,13 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.mysql.table.StartupMode;

Loading…
Cancel
Save