[FLINK-35912][cdc-connector] SqlServer CDC doesn't chunk UUID-typed columns correctly (#3497)

* resolve conficts

* polish code to trigger ci

---------

Co-authored-by: Kael <kael@fts.dev>
Co-authored-by: gongzhongqiang <gongzhongqiang@gigacloudtech.com>
pull/3449/head
lipl 6 months ago committed by GitHub
parent 986f37b307
commit f6d1d4810a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -214,12 +214,12 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
}
/** ChunkEnd less than or equal to max. */
protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
return ObjectUtils.compare(chunkEnd, max) <= 0;
}
/** ChunkEnd greater than or equal to max. */
protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
return ObjectUtils.compare(chunkEnd, max) >= 0;
}
@ -368,7 +368,7 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on PostgreSQL server
@ -397,7 +397,7 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (isChunkEndGeMax(chunkEnd, max)) {
if (isChunkEndGeMax(chunkEnd, max, splitColumn)) {
return null;
} else {
return chunkEnd;

@ -102,7 +102,7 @@ public class OracleChunkSplitter extends JdbcSourceChunkSplitter {
/** ChunkEnd less than or equal to max. */
@Override
protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
@ -116,7 +116,7 @@ public class OracleChunkSplitter extends JdbcSourceChunkSplitter {
/** ChunkEnd greater than or equal to max. */
@Override
protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =

@ -64,4 +64,13 @@ public class SqlServerChunkSplitter extends JdbcSourceChunkSplitter {
throws SQLException {
return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
}
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0;
}
/** ChunkEnd greater than or equal to max. */
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0;
}
}

@ -27,6 +27,9 @@ import java.sql.Types;
/** Utilities for converting from SqlServer types to Flink types. */
public class SqlServerTypeUtils {
/** Microsoft SQL type GUID's type name. */
static final String UNIQUEIDENTIFIRER = "uniqueidentifier";
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
DataType dataType = convertFromColumn(column);

@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.sqlserver.source.utils;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.sqlserver.source.offset.LsnOffset;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
@ -40,14 +41,17 @@ import org.apache.kafka.connect.source.SourceRecord;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.DataTypes.FIELD;
@ -297,8 +301,7 @@ public class SqlServerUtils {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty());
} else {
final String orderBy =
pkRowType.getFieldNames().stream().collect(Collectors.joining(", "));
final String orderBy = String.join(", ", pkRowType.getFieldNames());
return buildSelectWithBoundaryRowLimits(
tableId,
limitSize,
@ -322,7 +325,7 @@ public class SqlServerUtils {
StringBuilder sql = new StringBuilder();
for (Iterator<String> fieldNamesIt = pkRowType.getFieldNames().iterator();
fieldNamesIt.hasNext(); ) {
sql.append("MAX(" + fieldNamesIt.next() + ")");
sql.append("MAX(").append(fieldNamesIt.next()).append(")");
if (fieldNamesIt.hasNext()) {
sql.append(" , ");
}
@ -342,12 +345,8 @@ public class SqlServerUtils {
}
sql.append(projection).append(" FROM ");
sql.append(quoteSchemaAndTable(tableId));
if (condition.isPresent()) {
sql.append(" WHERE ").append(condition.get());
}
if (orderBy.isPresent()) {
sql.append(" ORDER BY ").append(orderBy.get());
}
condition.ifPresent(s -> sql.append(" WHERE ").append(s));
orderBy.ifPresent(s -> sql.append(" ORDER BY ").append(s));
return sql.toString();
}
@ -396,11 +395,54 @@ public class SqlServerUtils {
sql.append(projection);
sql.append(" FROM ");
sql.append(quoteSchemaAndTable(tableId));
if (condition.isPresent()) {
sql.append(" WHERE ").append(condition.get());
}
condition.ifPresent(s -> sql.append(" WHERE ").append(s));
sql.append(" ORDER BY ").append(orderBy);
sql.append(") T");
return sql.toString();
}
public static int compare(Object obj1, Object obj2, Column splitColumn) {
if (splitColumn.typeName().equals(SqlServerTypeUtils.UNIQUEIDENTIFIRER)) {
return new SQLServerUUIDComparator()
.compare(UUID.fromString(obj1.toString()), UUID.fromString(obj2.toString()));
}
return ObjectUtils.compare(obj1, obj2);
}
/**
* Comparator for SQL Server UUIDs. SQL Server compares UUIDs in a different order than Java.
* Reference code: <a
* href="https://github.com/dotnet/runtime/blob/5535e31a712343a63f5d7d796cd874e563e5ac14/src/libraries/System.Data.Common/src/System/Data/SQLTypes/SQLGuid.cs#L113">SQLGuid.cs::CompareTo</a>
* Reference doc: <a
* href="https://learn.microsoft.com/uk-ua/sql/connect/ado-net/sql/compare-guid-uniqueidentifier-values?view=sql-server-ver16">Comparing
* GUID and uniqueidentifier values</a>
*/
static class SQLServerUUIDComparator implements Comparator<UUID> {
private static final int SIZE_OF_GUID = 16;
private static final byte[] GUID_ORDER = {
10, 11, 12, 13, 14, 15, 8, 9, 6, 7, 4, 5, 0, 1, 2, 3
};
public int compare(UUID uuid1, UUID uuid2) {
byte[] bytes1 = uuidToBytes(uuid1);
byte[] bytes2 = uuidToBytes(uuid2);
for (int i = 0; i < SIZE_OF_GUID; i++) {
byte b1 = bytes1[GUID_ORDER[i]];
byte b2 = bytes2[GUID_ORDER[i]];
if (b1 != b2) {
return (b1 & 0xFF) - (b2 & 0xFF); // Unsigned byte comparison
}
}
return 0;
}
private byte[] uuidToBytes(UUID uuid) {
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
return bb.array();
}
}
}

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.sqlserver.source.utils;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
/** Unit test for {@link SqlServerUtils.SQLServerUUIDComparator}. * */
public class SQLServerUUIDComparatorTest {
@Test
public void testComparator() {
SqlServerUtils.SQLServerUUIDComparator comparator =
new SqlServerUtils.SQLServerUUIDComparator();
// Create an ArrayList and fill it with Guid values.
List<UUID> guidList = new ArrayList<>();
guidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
guidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
guidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));
// Sort the Guids.
guidList.sort(ObjectUtils::compare);
assertEquals(
guidList.get(0).toString().toUpperCase(), "1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
assertEquals(
guidList.get(1).toString().toUpperCase(), "2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
assertEquals(
guidList.get(2).toString().toUpperCase(), "3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");
// Create an ArrayList of SqlGuids.
List<UUID> sqlGuidList = new ArrayList<>();
sqlGuidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
sqlGuidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
sqlGuidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));
// Sort the SqlGuids. The unsorted SqlGuids are in the same order
// as the unsorted Guid values.
sqlGuidList.sort(comparator);
// Display the sorted SqlGuids. The sorted SqlGuid values are ordered
// differently than the Guid values.
assertEquals(
sqlGuidList.get(0).toString().toUpperCase(),
"2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
assertEquals(
sqlGuidList.get(1).toString().toUpperCase(),
"3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");
assertEquals(
sqlGuidList.get(2).toString().toUpperCase(),
"1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
}
}
Loading…
Cancel
Save