[cdc-base][mysql] Fix split key comparison when may contain different types (#1499)

This closes #1496.
pull/1508/head
Leonard Xu 2 years ago committed by GitHub
parent dbd184ec36
commit 238305b2d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -30,6 +30,8 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
@ -197,14 +199,32 @@ public class SourceRecordUtils {
}
}
@SuppressWarnings("unchecked")
private static int compareObjects(Object o1, Object o2) {
if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
return ((Comparable) o1).compareTo(o2);
} else if (isNumericObject(o1) && isNumericObject(o2)) {
return toBigDecimal(o1).compareTo(toBigDecimal(o2));
} else {
return o1.toString().compareTo(o2.toString());
}
}
private static boolean isNumericObject(Object obj) {
return obj instanceof Byte
|| obj instanceof Short
|| obj instanceof Integer
|| obj instanceof Long
|| obj instanceof Float
|| obj instanceof Double
|| obj instanceof BigInteger
|| obj instanceof BigDecimal;
}
private static BigDecimal toBigDecimal(Object numericObj) {
return new BigDecimal(numericObj.toString());
}
public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) throws IOException {
Struct value = (Struct) schemaRecord.value();
String historyRecordStr = value.getString(HISTORY_RECORD_FIELD);

@ -33,6 +33,8 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
@ -334,14 +336,32 @@ public class RecordUtils {
}
}
@SuppressWarnings("unchecked")
private static int compareObjects(Object o1, Object o2) {
if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
return ((Comparable) o1).compareTo(o2);
} else if (isNumericObject(o1) && isNumericObject(o2)) {
return toBigDecimal(o1).compareTo(toBigDecimal(o2));
} else {
return o1.toString().compareTo(o2.toString());
}
}
private static boolean isNumericObject(Object obj) {
return obj instanceof Byte
|| obj instanceof Short
|| obj instanceof Integer
|| obj instanceof Long
|| obj instanceof Float
|| obj instanceof Double
|| obj instanceof BigInteger
|| obj instanceof BigDecimal;
}
private static BigDecimal toBigDecimal(Object numericObj) {
return new BigDecimal(numericObj.toString());
}
public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) throws IOException {
Struct value = (Struct) schemaRecord.value();
String historyRecordStr = value.getString(HISTORY_RECORD_FIELD);

@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.source.utils;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.splitKeyRangeContains;
@ -30,31 +31,75 @@ public class RecordUtilsTest {
@Test
public void testSplitKeyRangeContains() {
// table with only one split
assertTrue(splitKeyRangeContains(new Object[] {100L}, null, null));
assertKeyRangeContains(new Object[] {100L}, null, null);
// the last split
assertTrue(splitKeyRangeContains(new Object[] {101L}, new Object[] {100L}, null));
assertKeyRangeContains(new Object[] {101L}, new Object[] {100L}, null);
// the first split
assertTrue(splitKeyRangeContains(new Object[] {101L}, null, new Object[] {1024L}));
assertKeyRangeContains(new Object[] {101L}, null, new Object[] {1024L});
// general splits
assertTrue(
splitKeyRangeContains(
new Object[] {100L}, new Object[] {1L}, new Object[] {1024L}));
assertKeyRangeContains(new Object[] {100L}, new Object[] {1L}, new Object[] {1024L});
assertFalse(
splitKeyRangeContains(new Object[] {0L}, new Object[] {1L}, new Object[] {1024L}));
// split key from binlog may have different type
assertTrue(
splitKeyRangeContains(
new Object[] {BigInteger.valueOf(100L)},
new Object[] {1L},
new Object[] {1024L}));
assertKeyRangeContains(
new Object[] {BigInteger.valueOf(100L)}, new Object[] {1L}, new Object[] {1024L});
assertFalse(
splitKeyRangeContains(
new Object[] {BigInteger.valueOf(0L)},
new Object[] {1L},
new Object[] {1024L}));
}
@Test
public void testDifferentKeyTypes() {
// first split
assertKeyRangeContains(new Object[] {5}, null, new Object[] {Byte.valueOf("6")});
assertKeyRangeContains(new Object[] {5}, null, new Object[] {Short.valueOf("6")});
assertKeyRangeContains(new Object[] {5}, null, new Object[] {Integer.valueOf("6")});
assertKeyRangeContains(new Object[] {5}, null, new Object[] {Long.valueOf("6")});
assertKeyRangeContains(new Object[] {5}, null, new Object[] {BigInteger.valueOf(6)});
assertKeyRangeContains(new Object[] {5}, null, new Object[] {BigDecimal.valueOf(6)});
// other splits
assertKeyRangeContains(
new Object[] {Byte.valueOf("6")},
new Object[] {Byte.valueOf("6")},
new Object[] {BigDecimal.valueOf(100000L)});
assertKeyRangeContains(
new Object[] {Short.valueOf("60")},
new Object[] {Short.valueOf("6")},
new Object[] {BigDecimal.valueOf(100000L)});
assertKeyRangeContains(
new Object[] {Integer.valueOf("600")},
new Object[] {Integer.valueOf("6")},
new Object[] {BigDecimal.valueOf(100000L)});
assertKeyRangeContains(
new Object[] {Long.valueOf("6000")},
new Object[] {Long.valueOf("6")},
new Object[] {BigDecimal.valueOf(100000L)});
assertKeyRangeContains(
new Object[] {BigInteger.valueOf(60000)},
new Object[] {BigInteger.valueOf(6)},
new Object[] {BigDecimal.valueOf(100000L)});
assertKeyRangeContains(
new Object[] {BigDecimal.valueOf(60000)},
new Object[] {BigDecimal.valueOf(6)},
new Object[] {BigDecimal.valueOf(100000L)});
// last split
assertKeyRangeContains(new Object[] {7}, new Object[] {Byte.valueOf("6")}, null);
assertKeyRangeContains(new Object[] {7}, new Object[] {Short.valueOf("6")}, null);
assertKeyRangeContains(new Object[] {7}, new Object[] {Integer.valueOf("6")}, null);
assertKeyRangeContains(new Object[] {7}, new Object[] {Long.valueOf("6")}, null);
assertKeyRangeContains(new Object[] {7}, new Object[] {BigInteger.valueOf(6)}, null);
assertKeyRangeContains(new Object[] {7}, new Object[] {BigDecimal.valueOf(6)}, null);
}
private void assertKeyRangeContains(
Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
assertTrue(splitKeyRangeContains(key, splitKeyStart, splitKeyEnd));
}
}

Loading…
Cancel
Save