[hotfix][mysql] Trim default value for BIGINT and SMALLINT types when parsing MySQL DDL (#2418)

pull/2385/head
Hang Ruan 1 year ago committed by GitHub
parent 54df7e93f0
commit 8e7378d8f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,527 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import io.debezium.annotation.Immutable;
import io.debezium.relational.Column;
import io.debezium.relational.DefaultValueConverter;
import io.debezium.relational.ValueConverter;
import io.debezium.util.Collect;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Copied from Debezium project(1.9.7.final) to add BIGINT and SMALLINT to TRIM_DATA_TYPES. Remove
* this when https://issues.redhat.com/browse/DBZ-6824 is fixed.
*
* <p>Line 81 & 82: add BIGINT and SMALLINT to TRIM_DATA_TYPES.
*/
@Immutable
public class MySqlDefaultValueConverter implements DefaultValueConverter {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDefaultValueConverter.class);
private static final Pattern EPOCH_EQUIVALENT_TIMESTAMP =
Pattern.compile(
"(\\d{4}-\\d{2}-00|\\d{4}-00-\\d{2}|0000-\\d{2}-\\d{2}) (00:00:00(\\.\\d{1,6})?)");
private static final Pattern EPOCH_EQUIVALENT_DATE =
Pattern.compile("\\d{4}-\\d{2}-00|\\d{4}-00-\\d{2}|0000-\\d{2}-\\d{2}");
private static final String EPOCH_TIMESTAMP = "1970-01-01 00:00:00";
private static final String EPOCH_DATE = "1970-01-01";
private static final Pattern TIMESTAMP_PATTERN =
Pattern.compile("([0-9]*-[0-9]*-[0-9]*) ([0-9]*:[0-9]*:[0-9]*(\\.([0-9]*))?)");
private static final Pattern CHARSET_INTRODUCER_PATTERN =
Pattern.compile("^_[A-Za-z0-9]+'(.*)'$");
@Immutable
private static final Set<Integer> TRIM_DATA_TYPES =
Collect.unmodifiableSet(
Types.TINYINT,
Types.INTEGER,
Types.DATE,
Types.TIMESTAMP,
Types.TIMESTAMP_WITH_TIMEZONE,
Types.TIME,
Types.BOOLEAN,
Types.BIT,
Types.NUMERIC,
Types.DECIMAL,
Types.FLOAT,
Types.DOUBLE,
Types.REAL,
Types.BIGINT,
Types.SMALLINT);
@Immutable
private static final Set<Integer> NUMBER_DATA_TYPES =
Collect.unmodifiableSet(
Types.BIT,
Types.TINYINT,
Types.SMALLINT,
Types.INTEGER,
Types.BIGINT,
Types.FLOAT,
Types.REAL,
Types.DOUBLE,
Types.NUMERIC,
Types.DECIMAL);
private static final DateTimeFormatter ISO_LOCAL_DATE_WITH_OPTIONAL_TIME =
new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.optionalStart()
.appendLiteral(" ")
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalEnd()
.toFormatter();
private final MySqlValueConverters converters;
public MySqlDefaultValueConverter(MySqlValueConverters converters) {
this.converters = converters;
}
/**
* This interface is used by a DDL parser to convert the string default value to a Java type
* recognized by value converters for a subset of types.
*
* @param column the column definition describing the {@code data} value; never null
* @param defaultValueExpression the default value literal; may be null
* @return value converted to a Java type; optional
*/
@Override
public Optional<Object> parseDefaultValue(Column column, String defaultValueExpression) {
Object logicalDefaultValue = convert(column, defaultValueExpression);
if (logicalDefaultValue == null) {
return Optional.empty();
}
final SchemaBuilder schemaBuilder = converters.schemaBuilder(column);
if (schemaBuilder == null) {
return Optional.of(logicalDefaultValue);
}
final Schema schema = schemaBuilder.build();
// In order to get the valueConverter for this column, we have to create a field;
// The index value -1 in the field will never used when converting default value;
// So we can set any number here;
final Field field = new Field(column.name(), -1, schema);
final ValueConverter valueConverter = converters.converter(column, field);
return Optional.ofNullable(valueConverter.convert(logicalDefaultValue));
}
/**
* Converts a default value from the expected format to a logical object acceptable by the main
* JDBC converter.
*
* @param column column definition
* @param value string formatted default value
* @return value converted to a Java type
*/
public Object convert(Column column, String value) {
if (value == null) {
return value;
}
// trim non varchar data types before converting
if (TRIM_DATA_TYPES.contains(column.jdbcType())) {
value = value.trim();
}
// strip character set introducer on default value expressions
value = stripCharacterSetIntroducer(value);
// boolean is also INT(1) or TINYINT(1)
if (NUMBER_DATA_TYPES.contains(column.jdbcType())
&& ("true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value))) {
if (Types.DECIMAL == column.jdbcType() || Types.NUMERIC == column.jdbcType()) {
return convertToDecimal(column, value.equalsIgnoreCase("true") ? "1" : "0");
}
return value.equalsIgnoreCase("true") ? 1 : 0;
}
switch (column.jdbcType()) {
case Types.DATE:
return convertToLocalDate(column, value);
case Types.TIMESTAMP:
return convertToLocalDateTime(column, value);
case Types.TIMESTAMP_WITH_TIMEZONE:
return convertToTimestamp(column, value);
case Types.TIME:
return convertToDuration(column, value);
case Types.BOOLEAN:
return convertToBoolean(value);
case Types.BIT:
return convertToBits(column, value);
case Types.NUMERIC:
case Types.DECIMAL:
return convertToDecimal(column, value);
case Types.FLOAT:
case Types.DOUBLE:
case Types.REAL:
return convertToDouble(value);
}
return value;
}
/**
* Converts a string object for an object type of {@link LocalDate} or {@link LocalDateTime} in
* case of MySql Date type. If the column definition allows null and default value is
* 0000-00-00, we need return null; else 0000-00-00 will be replaced with 1970-01-01;
*
* @param column the column definition describing the {@code data} value; never null
* @param value the string object to be converted into a {@link LocalDate} type or {@link
* LocalDateTime} in case of MySql Date type;
* @return the converted value;
*/
private Object convertToLocalDate(Column column, String value) {
final boolean zero =
EPOCH_EQUIVALENT_DATE.matcher(value).matches()
|| EPOCH_EQUIVALENT_TIMESTAMP.matcher(value).matches()
|| "0".equals(value);
if (zero && column.isOptional()) {
return null;
}
if (zero) {
value = EPOCH_DATE;
}
try {
return LocalDate.from(ISO_LOCAL_DATE_WITH_OPTIONAL_TIME.parse(value));
} catch (Exception e) {
LOGGER.warn(
"Invalid default value '{}' for date column '{}'; {}",
value,
column.name(),
e.getMessage());
if (column.isOptional()) {
return null;
} else {
return LocalDate.from(ISO_LOCAL_DATE_WITH_OPTIONAL_TIME.parse(EPOCH_DATE));
}
}
}
/**
* Converts a string object for an object type of {@link LocalDateTime}. If the column
* definition allows null and default value is 0000-00-00 00:00:00, we need return null, else
* 0000-00-00 00:00:00 will be replaced with 1970-01-01 00:00:00;
*
* @param column the column definition describing the {@code data} value; never null
* @param value the string object to be converted into a {@link LocalDateTime} type;
* @return the converted value;
*/
private Object convertToLocalDateTime(Column column, String value) {
final boolean matches =
EPOCH_EQUIVALENT_TIMESTAMP.matcher(value).matches() || "0".equals(value);
if (matches) {
if (column.isOptional()) {
return null;
}
value = EPOCH_TIMESTAMP;
}
try {
return LocalDateTime.from(timestampFormat(column.length()).parse(value));
} catch (Exception e) {
LOGGER.warn(
"Invalid default value '{}' for datetime column '{}'; {}",
value,
column.name(),
e.getMessage());
if (column.isOptional()) {
return null;
} else {
return LocalDateTime.from(timestampFormat(column.length()).parse(EPOCH_TIMESTAMP));
}
}
}
/**
* Converts a string object for an object type of {@link Timestamp}. If the column definition
* allows null and default value is 0000-00-00 00:00:00, we need return null, else 0000-00-00
* 00:00:00 will be replaced with 1970-01-01 00:00:00;
*
* @param column the column definition describing the {@code data} value; never null
* @param value the string object to be converted into a {@link Timestamp} type;
* @return the converted value;
*/
private Object convertToTimestamp(Column column, String value) {
final boolean matches =
EPOCH_EQUIVALENT_TIMESTAMP.matcher(value).matches()
|| "0".equals(value)
|| EPOCH_TIMESTAMP.equals(value);
if (matches) {
if (column.isOptional()) {
return null;
}
return Timestamp.from(Instant.EPOCH);
}
value = cleanTimestamp(value);
return Timestamp.valueOf(value).toInstant().atZone(ZoneId.systemDefault());
}
/**
* Converts a string object for an object type of {@link Duration}.
*
* @param column the column definition describing the {@code data} value; never null
* @param value the string object to be converted into a {@link Duration} type;
* @return the converted value;
*/
private Object convertToDuration(Column column, String value) {
Matcher matcher = TIMESTAMP_PATTERN.matcher(value);
if (matcher.matches()) {
value = matcher.group(2);
}
return MySqlValueConverters.stringToDuration(value);
}
/**
* Converts a string object for an expected JDBC type of {@link Types#DOUBLE}.
*
* @param value the string object to be converted into a {@link Types#DOUBLE} type;
* @return the converted value;
*/
private Object convertToDouble(String value) {
return Double.parseDouble(value);
}
/**
* Converts a string object for an expected JDBC type of {@link Types#DECIMAL}.
*
* @param column the column definition describing the {@code data} value; never null
* @param value the string object to be converted into a {@link Types#DECIMAL} type;
* @return the converted value;
*/
private Object convertToDecimal(Column column, String value) {
return column.scale().isPresent()
? new BigDecimal(value).setScale(column.scale().get(), RoundingMode.HALF_UP)
: new BigDecimal(value);
}
/**
* Converts a string object for an expected JDBC type of {@link Types#BIT}.
*
* @param column the column definition describing the {@code data} value; never null
* @param value the string object to be converted into a {@link Types#BIT} type;
* @return the converted value;
*/
private Object convertToBits(Column column, String value) {
if (column.length() > 1) {
return convertToBits(value);
}
return convertToBit(value);
}
private Object convertToBit(String value) {
try {
return Short.parseShort(value) != 0;
} catch (NumberFormatException ignore) {
return Boolean.parseBoolean(value);
}
}
private Object convertToBits(String value) {
int nums = value.length() / Byte.SIZE + (value.length() % Byte.SIZE == 0 ? 0 : 1);
byte[] bytes = new byte[nums];
for (int i = 0; i < nums; i++) {
int s = value.length() - Byte.SIZE < 0 ? 0 : value.length() - Byte.SIZE;
int e = value.length();
bytes[nums - i - 1] = (byte) Integer.parseInt(value.substring(s, e), 2);
value = value.substring(0, s);
}
return bytes;
}
/**
* Converts a string object for an expected JDBC type of {@link Types#BOOLEAN}.
*
* @param value the string object to be converted into a {@link Types#BOOLEAN} type;
* @return the converted value;
*/
private Object convertToBoolean(String value) {
try {
return Integer.parseInt(value) != 0;
} catch (NumberFormatException ignore) {
return Boolean.parseBoolean(value);
}
}
private DateTimeFormatter timestampFormat(int length) {
final DateTimeFormatterBuilder dtf =
new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd")
.optionalStart()
.appendLiteral(" ")
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalEnd()
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0);
if (length > 0) {
dtf.appendFraction(ChronoField.MICRO_OF_SECOND, 0, length, true);
}
return dtf.toFormatter();
}
/**
* Clean input timestamp to yyyy-mm-dd hh:mm:ss[.fffffffff] format.
*
* @param s input timestamp
* @return cleaned timestamp
*/
private String cleanTimestamp(String s) {
if (s == null) {
throw new java.lang.IllegalArgumentException("null string");
}
s = s.trim();
// clean first dash
s = replaceFirstNonNumericSubstring(s, 0, '-');
// clean second dash
s = replaceFirstNonNumericSubstring(s, s.indexOf('-') + 1, '-');
// clean dividing space
s = replaceFirstNonNumericSubstring(s, s.indexOf('-', s.indexOf('-') + 1) + 1, ' ');
if (s.indexOf(' ') != -1) {
// clean first colon
s = replaceFirstNonNumericSubstring(s, s.indexOf(' ') + 1, ':');
if (s.indexOf(':') != -1) {
// clean second colon
s = replaceFirstNonNumericSubstring(s, s.indexOf(':') + 1, ':');
}
}
final int maxMonth = 12;
final int maxDay = 31;
// Parse the date
int firstDash = s.indexOf('-');
int secondDash = s.indexOf('-', firstDash + 1);
int dividingSpace = s.indexOf(' ');
// Parse the time
int firstColon = s.indexOf(':', dividingSpace + 1);
int secondColon = s.indexOf(':', firstColon + 1);
int period = s.indexOf('.', secondColon + 1);
int year = 0;
int month = 0;
int day = 0;
int hour = 0;
int minute = 0;
int second = 0;
// Get the date
int len = s.length();
boolean parsedDate = false;
if (firstDash > 0 && secondDash > firstDash) {
year = Integer.parseInt(s.substring(0, firstDash));
month = Integer.parseInt(s.substring(firstDash + 1, secondDash));
if (dividingSpace != -1) {
day = Integer.parseInt(s.substring(secondDash + 1, dividingSpace));
} else {
day = Integer.parseInt(s.substring(secondDash + 1, len));
}
if ((month >= 1 && month <= maxMonth) && (day >= 1 && day <= maxDay)) {
parsedDate = true;
}
}
if (!parsedDate) {
throw new java.lang.IllegalArgumentException("Cannot parse the date from " + s);
}
// Get the time. Hour, minute, second and colons are all optional
if (dividingSpace != -1 && dividingSpace < len - 1) {
if (firstColon == -1) {
hour = Integer.parseInt(s.substring(dividingSpace + 1, len));
} else {
hour = Integer.parseInt(s.substring(dividingSpace + 1, firstColon));
if (firstColon < len - 1) {
if (secondColon == -1) {
minute = Integer.parseInt(s.substring(firstColon + 1, len));
} else {
minute = Integer.parseInt(s.substring(firstColon + 1, secondColon));
if (secondColon < len - 1) {
if (period == -1) {
second = Integer.parseInt(s.substring(secondColon + 1, len));
} else {
second = Integer.parseInt(s.substring(secondColon + 1, period));
}
}
}
}
}
}
StringBuilder cleanedTimestamp = new StringBuilder();
cleanedTimestamp =
cleanedTimestamp.append(
String.format(
"%04d-%02d-%02d %02d:%02d:%02d",
year, month, day, hour, minute, second));
if (period != -1 && period < len - 1) {
cleanedTimestamp = cleanedTimestamp.append(".").append(s.substring(period + 1));
}
return cleanedTimestamp.toString();
}
/**
* Replace the first non-numeric substring.
*
* @param s the original string
* @param startIndex the beginning index, inclusive
* @param c the new character
* @return
*/
private String replaceFirstNonNumericSubstring(String s, int startIndex, char c) {
StringBuilder sb = new StringBuilder();
sb.append(s.substring(0, startIndex));
String rest = s.substring(startIndex);
sb.append(rest.replaceFirst("[^\\d]+", Character.toString(c)));
return sb.toString();
}
private String stripCharacterSetIntroducer(String value) {
final Matcher matcher = CHARSET_INTRODUCER_PATTERN.matcher(value);
return !matcher.matches() ? value : matcher.group(1);
}
}

@ -1412,7 +1412,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
customerDatabase.getUsername(),
customerDatabase.getPassword(),
customerDatabase.getDatabaseName(),
"default_value_test",
"default_value_test.*",
incrementalSnapshot,
getServerId(),
getSplitSize());
@ -1479,6 +1479,39 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
statement.execute(
"alter table default_value_test add column\n"
+ " `new_col` smallint(1) unsigned DEFAULT ' 31 ';");
statement.execute(
" CREATE TABLE default_value_test_ignore (\n"
+ " id INTEGER NOT NULL PRIMARY KEY, \n"
+ " tiny_c TINYINT DEFAULT ' 0 ', \n"
+ " boolean_c BOOLEAN DEFAULT ' 1 ', \n"
+ " tiny_un_z_c TINYINT UNSIGNED ZEROFILL DEFAULT ' 2 ', \n"
+ " small_c SMALLINT DEFAULT ' 3 ', \n"
+ " small_un_c SMALLINT UNSIGNED DEFAULT ' 4 ',\n"
+ " small_un_z_c SMALLINT UNSIGNED ZEROFILL DEFAULT ' 5 ', \n"
+ " medium_c MEDIUMINT DEFAULT ' 6 ', \n"
+ " medium_un_c MEDIUMINT UNSIGNED DEFAULT ' 7 ', \n"
+ " medium_un_z_c MEDIUMINT UNSIGNED ZEROFILL DEFAULT ' 8 ', \n"
+ " int_c INTEGER DEFAULT ' 9 ', \n"
+ " int_un_c INTEGER UNSIGNED DEFAULT ' 10 ', \n"
+ " int_un_z_c INTEGER UNSIGNED ZEROFILL DEFAULT ' 11 ',\n"
+ " int11_c INT(11) DEFAULT ' 12 ', \n"
+ " big_c BIGINT DEFAULT ' 13 ', \n"
+ " big_un_c BIGINT UNSIGNED DEFAULT ' 14 ', \n"
+ " big_un_z_c BIGINT UNSIGNED ZEROFILL DEFAULT ' 15 ', \n"
+ " decimal_c DECIMAL(8, 4) DEFAULT ' 16 ', \n"
+ " decimal_un_c DECIMAL(8, 4) UNSIGNED DEFAULT ' 17 ', \n"
+ " decimal_un_z_c DECIMAL(8, 4) UNSIGNED ZEROFILL DEFAULT ' 18 ', \n"
+ " numeric_c NUMERIC(6, 0) DEFAULT ' 19 ', \n"
+ " big_decimal_c DECIMAL(65, 1) DEFAULT ' 20 ',\n"
+ " real_c REAL DEFAULT ' 21.0',\n"
+ " float_c FLOAT DEFAULT ' 22.0',\n"
+ " float_un_c FLOAT UNSIGNED DEFAULT ' 23',\n"
+ " float_un_z_c FLOAT UNSIGNED ZEROFILL DEFAULT ' 24',\n"
+ " double_c DOUBLE DEFAULT ' 25',\n"
+ " double_un_c DOUBLE UNSIGNED DEFAULT ' 26',\n"
+ " double_un_z_c DOUBLE UNSIGNED ZEROFILL DEFAULT ' 27',\n"
+ " tiny_un_c TINYINT UNSIGNED DEFAULT ' 28 '"
+ " );");
}
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
jobClient.cancel().get();

Loading…
Cancel
Save