diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java index 524a3cd51..57c0704a9 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java @@ -30,6 +30,8 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; @@ -197,14 +199,32 @@ public class SourceRecordUtils { } } + @SuppressWarnings("unchecked") private static int compareObjects(Object o1, Object o2) { if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) { return ((Comparable) o1).compareTo(o2); + } else if (isNumericObject(o1) && isNumericObject(o2)) { + return toBigDecimal(o1).compareTo(toBigDecimal(o2)); } else { return o1.toString().compareTo(o2.toString()); } } + private static boolean isNumericObject(Object obj) { + return obj instanceof Byte + || obj instanceof Short + || obj instanceof Integer + || obj instanceof Long + || obj instanceof Float + || obj instanceof Double + || obj instanceof BigInteger + || obj instanceof BigDecimal; + } + + private static BigDecimal toBigDecimal(Object numericObj) { + return new BigDecimal(numericObj.toString()); + } + public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) throws IOException { Struct value = (Struct) schemaRecord.value(); String historyRecordStr = value.getString(HISTORY_RECORD_FIELD); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java index 0c31950c0..80af25c3f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -33,6 +33,8 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; @@ -334,14 +336,32 @@ public class RecordUtils { } } + @SuppressWarnings("unchecked") private static int compareObjects(Object o1, Object o2) { if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) { return ((Comparable) o1).compareTo(o2); + } else if (isNumericObject(o1) && isNumericObject(o2)) { + return toBigDecimal(o1).compareTo(toBigDecimal(o2)); } else { return o1.toString().compareTo(o2.toString()); } } + private static boolean isNumericObject(Object obj) { + return obj instanceof Byte + || obj instanceof Short + || obj instanceof Integer + || obj instanceof Long + || obj instanceof Float + || obj instanceof Double + || obj instanceof BigInteger + || obj instanceof BigDecimal; + } + + private static BigDecimal toBigDecimal(Object numericObj) { + return new BigDecimal(numericObj.toString()); + } + public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) throws IOException { Struct value = (Struct) schemaRecord.value(); String historyRecordStr = value.getString(HISTORY_RECORD_FIELD); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtilsTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtilsTest.java index 81f2cc03f..9dc795c16 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtilsTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtilsTest.java @@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.source.utils; import org.junit.Test; +import java.math.BigDecimal; import java.math.BigInteger; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.splitKeyRangeContains; @@ -30,31 +31,75 @@ public class RecordUtilsTest { @Test public void testSplitKeyRangeContains() { // table with only one split - assertTrue(splitKeyRangeContains(new Object[] {100L}, null, null)); - + assertKeyRangeContains(new Object[] {100L}, null, null); // the last split - assertTrue(splitKeyRangeContains(new Object[] {101L}, new Object[] {100L}, null)); + assertKeyRangeContains(new Object[] {101L}, new Object[] {100L}, null); // the first split - assertTrue(splitKeyRangeContains(new Object[] {101L}, null, new Object[] {1024L})); + assertKeyRangeContains(new Object[] {101L}, null, new Object[] {1024L}); // general splits - assertTrue( - splitKeyRangeContains( - new Object[] {100L}, new Object[] {1L}, new Object[] {1024L})); + assertKeyRangeContains(new Object[] {100L}, new Object[] {1L}, new Object[] {1024L}); assertFalse( splitKeyRangeContains(new Object[] {0L}, new Object[] {1L}, new Object[] {1024L})); // split key from binlog may have different type - assertTrue( - splitKeyRangeContains( - new Object[] {BigInteger.valueOf(100L)}, - new Object[] {1L}, - new Object[] {1024L})); + assertKeyRangeContains( + new Object[] {BigInteger.valueOf(100L)}, new Object[] {1L}, new Object[] {1024L}); assertFalse( splitKeyRangeContains( new Object[] {BigInteger.valueOf(0L)}, new Object[] {1L}, new Object[] {1024L})); } + + @Test + public void testDifferentKeyTypes() { + // first split + assertKeyRangeContains(new Object[] {5}, null, new Object[] {Byte.valueOf("6")}); + assertKeyRangeContains(new Object[] {5}, null, new Object[] {Short.valueOf("6")}); + assertKeyRangeContains(new Object[] {5}, null, new Object[] {Integer.valueOf("6")}); + assertKeyRangeContains(new Object[] {5}, null, new Object[] {Long.valueOf("6")}); + assertKeyRangeContains(new Object[] {5}, null, new Object[] {BigInteger.valueOf(6)}); + assertKeyRangeContains(new Object[] {5}, null, new Object[] {BigDecimal.valueOf(6)}); + + // other splits + assertKeyRangeContains( + new Object[] {Byte.valueOf("6")}, + new Object[] {Byte.valueOf("6")}, + new Object[] {BigDecimal.valueOf(100000L)}); + assertKeyRangeContains( + new Object[] {Short.valueOf("60")}, + new Object[] {Short.valueOf("6")}, + new Object[] {BigDecimal.valueOf(100000L)}); + assertKeyRangeContains( + new Object[] {Integer.valueOf("600")}, + new Object[] {Integer.valueOf("6")}, + new Object[] {BigDecimal.valueOf(100000L)}); + assertKeyRangeContains( + new Object[] {Long.valueOf("6000")}, + new Object[] {Long.valueOf("6")}, + new Object[] {BigDecimal.valueOf(100000L)}); + assertKeyRangeContains( + new Object[] {BigInteger.valueOf(60000)}, + new Object[] {BigInteger.valueOf(6)}, + new Object[] {BigDecimal.valueOf(100000L)}); + assertKeyRangeContains( + new Object[] {BigDecimal.valueOf(60000)}, + new Object[] {BigDecimal.valueOf(6)}, + new Object[] {BigDecimal.valueOf(100000L)}); + + // last split + assertKeyRangeContains(new Object[] {7}, new Object[] {Byte.valueOf("6")}, null); + assertKeyRangeContains(new Object[] {7}, new Object[] {Short.valueOf("6")}, null); + assertKeyRangeContains(new Object[] {7}, new Object[] {Integer.valueOf("6")}, null); + assertKeyRangeContains(new Object[] {7}, new Object[] {Long.valueOf("6")}, null); + assertKeyRangeContains(new Object[] {7}, new Object[] {BigInteger.valueOf(6)}, null); + assertKeyRangeContains(new Object[] {7}, new Object[] {BigDecimal.valueOf(6)}, null); + } + + private void assertKeyRangeContains( + Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) { + assertTrue(splitKeyRangeContains(key, splitKeyStart, splitKeyEnd)); + } }