diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java index 6050869b4..f1f1d955c 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -18,7 +18,6 @@ package com.ververica.cdc.connectors.mysql.source.utils; -import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind; @@ -348,15 +347,8 @@ public class RecordUtils { } } - /** Return the split key type can use numeric optimization or not. */ - public static boolean isOptimizedKeyType(LogicalTypeRoot typeRoot) { - return typeRoot == LogicalTypeRoot.BIGINT - || typeRoot == LogicalTypeRoot.INTEGER - || typeRoot == LogicalTypeRoot.DECIMAL; - } - private static int compareObjects(Object o1, Object o2) { - if (o1 instanceof Comparable) { + if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) { return ((Comparable) o1).compareTo(o2); } else { return o1.toString().compareTo(o2.toString()); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java deleted file mode 100644 index 97542e4b3..000000000 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.mysql.source.utils; - -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.Preconditions; - -import io.debezium.relational.Column; -import io.debezium.relational.Table; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -/** Utils to obtain and validate split key of table. */ -public class SplitKeyUtils { - - private SplitKeyUtils() {} - - /** - * Returns the split key type, the split key should be single field。 - * - *
The split key is primary key when primary key contains single field, the split key will be
- * inferred when the primary key contains multiple field.
- *
- * @param definedPkType the defined primary key type in Flink.
- * @param actualTable the table schema in MySQL database.
- */
- public static RowType validateAndGetSplitKeyType(RowType definedPkType, Table actualTable) {
- Preconditions.checkState(
- definedPkType.getFieldCount() >= 1,
- "The primary key is required in Flink SQL Table definition.");
- Preconditions.checkState(
- !actualTable.primaryKeyColumnNames().isEmpty(),
- String.format(
- "Only supports capture table with primary key, but the table %s has no primary key.",
- actualTable.id()));
-
- validatePrimaryKey(definedPkType.getFieldNames(), actualTable.primaryKeyColumnNames());
-
- if (definedPkType.getFieldCount() == 1) {
- return definedPkType;
- } else {
- // use the first defined primary key used combine key.
- return new RowType(Arrays.asList(definedPkType.getFields().get(0)));
- }
- }
-
- public static boolean splitKeyIsAutoIncremented(RowType splitKeyType, Table actualTable) {
- final String splitKeyName = unquoteColumnName(splitKeyType.getFieldNames().get(0));
- final Column column = actualTable.columnWithName(splitKeyName);
- return column != null && column.isAutoIncremented();
- }
-
- private static void validatePrimaryKey(
- List