From 851dcc66b4a63fff8aa7c78dca74f1405b6fb18d Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Wed, 15 Sep 2021 15:24:02 +0800 Subject: [PATCH] [mysql] Fix the ClassCastException in RecordUtils this closes #330 --- .../mysql/source/utils/RecordUtils.java | 10 +- .../mysql/source/utils/SplitKeyUtils.java | 123 ------------------ .../mysql/source/utils/RecordUtilsTest.java | 62 +++++++++ 3 files changed, 63 insertions(+), 132 deletions(-) delete mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java create mode 100644 flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtilsTest.java diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java index 6050869b4..f1f1d955c 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -18,7 +18,6 @@ package com.ververica.cdc.connectors.mysql.source.utils; -import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind; @@ -348,15 +347,8 @@ public class RecordUtils { } } - /** Return the split key type can use numeric optimization or not. */ - public static boolean isOptimizedKeyType(LogicalTypeRoot typeRoot) { - return typeRoot == LogicalTypeRoot.BIGINT - || typeRoot == LogicalTypeRoot.INTEGER - || typeRoot == LogicalTypeRoot.DECIMAL; - } - private static int compareObjects(Object o1, Object o2) { - if (o1 instanceof Comparable) { + if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) { return ((Comparable) o1).compareTo(o2); } else { return o1.toString().compareTo(o2.toString()); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java deleted file mode 100644 index 97542e4b3..000000000 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.mysql.source.utils; - -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.Preconditions; - -import io.debezium.relational.Column; -import io.debezium.relational.Table; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -/** Utils to obtain and validate split key of table. */ -public class SplitKeyUtils { - - private SplitKeyUtils() {} - - /** - * Returns the split key type, the split key should be single field。 - * - *

The split key is primary key when primary key contains single field, the split key will be - * inferred when the primary key contains multiple field. - * - * @param definedPkType the defined primary key type in Flink. - * @param actualTable the table schema in MySQL database. - */ - public static RowType validateAndGetSplitKeyType(RowType definedPkType, Table actualTable) { - Preconditions.checkState( - definedPkType.getFieldCount() >= 1, - "The primary key is required in Flink SQL Table definition."); - Preconditions.checkState( - !actualTable.primaryKeyColumnNames().isEmpty(), - String.format( - "Only supports capture table with primary key, but the table %s has no primary key.", - actualTable.id())); - - validatePrimaryKey(definedPkType.getFieldNames(), actualTable.primaryKeyColumnNames()); - - if (definedPkType.getFieldCount() == 1) { - return definedPkType; - } else { - // use the first defined primary key used combine key. - return new RowType(Arrays.asList(definedPkType.getFields().get(0))); - } - } - - public static boolean splitKeyIsAutoIncremented(RowType splitKeyType, Table actualTable) { - final String splitKeyName = unquoteColumnName(splitKeyType.getFieldNames().get(0)); - final Column column = actualTable.columnWithName(splitKeyName); - return column != null && column.isAutoIncremented(); - } - - private static void validatePrimaryKey( - List definedPkFieldnames, List actualPkFieldNames) { - List formattedDefinedPk = - definedPkFieldnames.stream() - .map(SplitKeyUtils::unquoteColumnName) - .sorted() - .collect(Collectors.toList()); - List formattedActualPk = - actualPkFieldNames.stream() - .map(SplitKeyUtils::unquoteColumnName) - .sorted() - .collect(Collectors.toList()); - - String exceptionMsg = - String.format( - "The defined primary key %s in Flink is not matched with actual primary key %s in MySQL", - definedPkFieldnames, actualPkFieldNames); - Preconditions.checkState( - formattedDefinedPk.size() == formattedActualPk.size() - && formattedDefinedPk.containsAll(formattedActualPk), - exceptionMsg); - } - - public static String unquoteColumnName(String columnName) { - if (columnName == null) { - return null; - } - if (columnName.length() < 2) { - return columnName.toLowerCase(); - } - - Character quotingChar = deriveQuotingChar(columnName); - if (quotingChar != null) { - columnName = columnName.substring(1, columnName.length() - 1); - columnName = - columnName.replace( - quotingChar.toString() + quotingChar.toString(), - quotingChar.toString()); - } - return columnName.toLowerCase(); - } - - private static Character deriveQuotingChar(String columnName) { - char first = columnName.charAt(0); - char last = columnName.charAt(columnName.length() - 1); - - if (first == last && (first == '"' || first == '\'' || first == '`')) { - return first; - } - - return null; - } -} diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtilsTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtilsTest.java new file mode 100644 index 000000000..eb0a9b922 --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtilsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.utils; + +import org.junit.Test; + +import java.math.BigInteger; + +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.splitKeyRangeContains; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link RecordUtils}. */ +public class RecordUtilsTest { + + @Test + public void testSplitKeyRangeContains() { + // table with only one split + assertTrue(splitKeyRangeContains(new Object[] {100L}, null, null)); + + // the last split + assertTrue(splitKeyRangeContains(new Object[] {101L}, new Object[] {100L}, null)); + + // the first split + assertTrue(splitKeyRangeContains(new Object[] {101L}, null, new Object[] {1024L})); + + // general splits + assertTrue( + splitKeyRangeContains( + new Object[] {100L}, new Object[] {1L}, new Object[] {1024L})); + assertFalse( + splitKeyRangeContains(new Object[] {0L}, new Object[] {1L}, new Object[] {1024L})); + + // split key from binlog may have different type + assertTrue( + splitKeyRangeContains( + new Object[] {BigInteger.valueOf(100L)}, + new Object[] {1L}, + new Object[] {1024L})); + assertFalse( + splitKeyRangeContains( + new Object[] {BigInteger.valueOf(0L)}, + new Object[] {1L}, + new Object[] {1024L})); + } +}