[cdc-pipeline-connector][mysql] Fix precision problem of BIT type conversion (#2820)

This closes #2820.
pull/2822/head
Jiabao Sun 1 year ago committed by GitHub
parent f970444efa
commit 8a08ded410
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,9 +18,13 @@ package com.ververica.cdc.connectors.mysql.source.parser;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@ -34,6 +38,241 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {
this.parsedEvents = new LinkedList<>();
}
// Overriding this method because the BIT type requires default length dimension of 1.
// Remove it when debezium fixed this issue.
@Override
protected DataTypeResolver initializeDataTypeResolver() {
DataTypeResolver.Builder dataTypeResolverBuilder = new DataTypeResolver.Builder();
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.StringDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHAR),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.CHAR, MySqlParser.VARYING),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.VARCHAR),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TINYTEXT),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TEXT),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.MEDIUMTEXT),
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONGTEXT),
new DataTypeResolver.DataTypeEntry(Types.NCHAR, MySqlParser.NCHAR),
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARYING),
new DataTypeResolver.DataTypeEntry(Types.NVARCHAR, MySqlParser.NVARCHAR),
new DataTypeResolver.DataTypeEntry(
Types.CHAR, MySqlParser.CHAR, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.VARCHAR, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.TINYTEXT, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.TEXT, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.MEDIUMTEXT, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.LONGTEXT, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.NCHAR, MySqlParser.NCHAR, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR, MySqlParser.NVARCHAR, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHARACTER),
new DataTypeResolver.DataTypeEntry(
Types.VARCHAR, MySqlParser.CHARACTER, MySqlParser.VARYING)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.NationalStringDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR, MySqlParser.NATIONAL, MySqlParser.VARCHAR)
.setSuffixTokens(MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.NCHAR, MySqlParser.NATIONAL, MySqlParser.CHARACTER)
.setSuffixTokens(MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARCHAR)
.setSuffixTokens(MySqlParser.BINARY)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.NationalVaryingStringDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR,
MySqlParser.NATIONAL,
MySqlParser.CHAR,
MySqlParser.VARYING),
new DataTypeResolver.DataTypeEntry(
Types.NVARCHAR,
MySqlParser.NATIONAL,
MySqlParser.CHARACTER,
MySqlParser.VARYING)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.DimensionDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.TINYINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT1)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.SMALLINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT2)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MEDIUMINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT3)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MIDDLEINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INTEGER)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT4)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.BIGINT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.INT8)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.REAL, MySqlParser.REAL)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.DOUBLE)
.setSuffixTokens(
MySqlParser.PRECISION,
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.FLOAT8)
.setSuffixTokens(
MySqlParser.PRECISION,
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT4)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL),
new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DECIMAL)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL)
.setDefaultLengthScaleDimension(10, 0),
new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DEC)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL)
.setDefaultLengthScaleDimension(10, 0),
new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.FIXED)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL)
.setDefaultLengthScaleDimension(10, 0),
new DataTypeResolver.DataTypeEntry(Types.NUMERIC, MySqlParser.NUMERIC)
.setSuffixTokens(
MySqlParser.SIGNED,
MySqlParser.UNSIGNED,
MySqlParser.ZEROFILL)
.setDefaultLengthScaleDimension(10, 0),
new DataTypeResolver.DataTypeEntry(Types.BIT, MySqlParser.BIT)
.setDefaultLengthDimension(1),
new DataTypeResolver.DataTypeEntry(Types.TIME, MySqlParser.TIME),
new DataTypeResolver.DataTypeEntry(
Types.TIMESTAMP_WITH_TIMEZONE, MySqlParser.TIMESTAMP),
new DataTypeResolver.DataTypeEntry(Types.TIMESTAMP, MySqlParser.DATETIME),
new DataTypeResolver.DataTypeEntry(Types.BINARY, MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(Types.VARBINARY, MySqlParser.VARBINARY),
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.BLOB),
new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.YEAR)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.SimpleDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.DATE, MySqlParser.DATE),
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.TINYBLOB),
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.MEDIUMBLOB),
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONGBLOB),
new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOL),
new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOLEAN),
new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.SERIAL)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.CollectionDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.ENUM)
.setSuffixTokens(MySqlParser.BINARY),
new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.SET)
.setSuffixTokens(MySqlParser.BINARY)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.SpatialDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(
Types.OTHER, MySqlParser.GEOMETRYCOLLECTION),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMCOLLECTION),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.LINESTRING),
new DataTypeResolver.DataTypeEntry(
Types.OTHER, MySqlParser.MULTILINESTRING),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOINT),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOLYGON),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POINT),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POLYGON),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.JSON),
new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMETRY)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.LongVarbinaryDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONG)
.setSuffixTokens(MySqlParser.VARBINARY)));
dataTypeResolverBuilder.registerDataTypes(
MySqlParser.LongVarcharDataTypeContext.class.getCanonicalName(),
Arrays.asList(
new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONG)
.setSuffixTokens(MySqlParser.VARCHAR)));
return dataTypeResolverBuilder.build();
}
@Override
protected AntlrDdlParserListener createParseTreeWalkerListener() {
return new CustomMySqlAntlrDdlParserListener(this, parsedEvents);

@ -124,7 +124,7 @@ public class MySqlTypeUtils {
case BIT:
return column.length() == 1
? DataTypes.BOOLEAN()
: DataTypes.BINARY(column.length() / 8);
: DataTypes.BINARY((column.length() + 7) / 8);
case BOOL:
case BOOLEAN:
return DataTypes.BOOLEAN();

@ -274,6 +274,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
// Decimal precision larger than 38 will be treated as string.
BinaryStringData.fromString("34567892.1"),
false,
new byte[] {3},
true,
true,
parseHexBinary("651aed08-390f-4893-b2f1-36923e7b7400".replace("-", "")),
@ -308,7 +309,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
expectedSnapshot[30] = null;
// The json string from binlog will remove useless space
expectedSnapshot[43] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
Object[] expectedStreamRecord = expectedSnapshot;
List<Event> streamResults = fetchResults(iterator, 1);
@ -406,6 +407,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
// Decimal precision larger than 38 will be treated as string.
DataTypes.STRING(),
DataTypes.BOOLEAN(),
DataTypes.BINARY(1),
DataTypes.BOOLEAN(),
DataTypes.BOOLEAN(),
DataTypes.BINARY(16),

@ -265,6 +265,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
// string.
DataTypes.STRING(),
DataTypes.BOOLEAN(),
DataTypes.BINARY(1),
DataTypes.BOOLEAN(),
DataTypes.BOOLEAN(),
DataTypes.BINARY(16),
@ -319,6 +320,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
"numeric_c",
"big_decimal_c",
"bit1_c",
"bit3_c",
"tiny1_c",
"boolean_c",
"file_uuid",

@ -289,6 +289,17 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"cols4", DataTypes.VARCHAR(55))))));
statement.execute(
String.format(
"ALTER TABLE `%s`.`products` ADD COLUMN `cols5` BIT NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("cols5", DataTypes.BOOLEAN())))));
}
List<Event> actual = fetchResults(events, expected.size());
assertThat(actual).isEqualTo(expected);

@ -49,6 +49,7 @@ CREATE TABLE common_types
numeric_c NUMERIC(6, 0),
big_decimal_c DECIMAL(65, 1),
bit1_c BIT,
bit3_c BIT(3),
tiny1_c TINYINT(1),
boolean_c BOOLEAN,
file_uuid BINARY(16),
@ -78,7 +79,7 @@ VALUES (DEFAULT, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215
18446744073709551615, 18446744073709551615,
'Hello World', 'abc', 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445,
123.4567, 123.4568, 123.4569, 345.6, 34567892.1,
0, 1, true,
0, b'011', 1, true,
unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400', '-', '')),
b'0000010000000100000001000000010000000100000001000000010000000100',
'text', UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 2021,

@ -49,6 +49,7 @@ CREATE TABLE common_types
numeric_c NUMERIC(6, 0),
big_decimal_c DECIMAL(65, 1),
bit1_c BIT,
bit3_c BIT(3),
tiny1_c TINYINT(1),
boolean_c BOOLEAN,
file_uuid BINARY(16),
@ -77,7 +78,8 @@ VALUES (DEFAULT, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215
4294967295, 4294967295, 2147483647, 9223372036854775807,
18446744073709551615, 18446744073709551615,
'Hello World', 'abc', 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445,
123.4567, 123.4568, 123.4569, 345.6, 34567892.1, 0, 1, true,
123.4567, 123.4568, 123.4569, 345.6, 34567892.1,
0, b'011', 1, true,
unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400', '-', '')),
b'0000010000000100000001000000010000000100000001000000010000000100',
'text', UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), UNHEX(HEX(16)), 2021,

@ -125,7 +125,7 @@ public class MySqlTypeUtils {
case BIT:
return column.length() == 1
? DataTypes.BOOLEAN()
: DataTypes.BINARY(column.length() / 8);
: DataTypes.BINARY((column.length() + 7) / 8);
case BOOL:
case BOOLEAN:
return DataTypes.BOOLEAN();

Loading…
Cancel
Save