diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java index 1d50164cc..508e8cb9b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java @@ -214,12 +214,12 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter { } /** ChunkEnd less than or equal to max. */ - protected boolean isChunkEndLeMax(Object chunkEnd, Object max) { + protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { return ObjectUtils.compare(chunkEnd, max) <= 0; } /** ChunkEnd greater than or equal to max. */ - protected boolean isChunkEndGeMax(Object chunkEnd, Object max) { + protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { return ObjectUtils.compare(chunkEnd, max) >= 0; } @@ -368,7 +368,7 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter { Object chunkStart = null; Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); int count = 0; - while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) { + while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) { // we start from [null, min + chunk_size) and avoid [null, min) splits.add(ChunkRange.of(chunkStart, chunkEnd)); // may sleep a while to avoid DDOS on PostgreSQL server @@ -397,7 +397,7 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter { // should query the next one larger than chunkEnd chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } - if (isChunkEndGeMax(chunkEnd, max)) { + if (isChunkEndGeMax(chunkEnd, max, splitColumn)) { return null; } else { return chunkEnd; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java index adf4c98a9..ffcdb4bb2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java @@ -102,7 +102,7 @@ public class OracleChunkSplitter extends JdbcSourceChunkSplitter { /** ChunkEnd less than or equal to max. */ @Override - protected boolean isChunkEndLeMax(Object chunkEnd, Object max) { + protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { boolean chunkEndMaxCompare; if (chunkEnd instanceof ROWID && max instanceof ROWID) { chunkEndMaxCompare = @@ -116,7 +116,7 @@ public class OracleChunkSplitter extends JdbcSourceChunkSplitter { /** ChunkEnd greater than or equal to max. */ @Override - protected boolean isChunkEndGeMax(Object chunkEnd, Object max) { + protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { boolean chunkEndMaxCompare; if (chunkEnd instanceof ROWID && max instanceof ROWID) { chunkEndMaxCompare = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java index fb338a0ea..e79d0bcfa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java @@ -64,4 +64,13 @@ public class SqlServerChunkSplitter extends JdbcSourceChunkSplitter { throws SQLException { return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId); } + + protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { + return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0; + } + + /** ChunkEnd greater than or equal to max. */ + protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { + return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java index 3d7163189..8f1853837 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java @@ -27,6 +27,9 @@ import java.sql.Types; /** Utilities for converting from SqlServer types to Flink types. */ public class SqlServerTypeUtils { + /** Microsoft SQL type GUID's type name. */ + static final String UNIQUEIDENTIFIRER = "uniqueidentifier"; + /** Returns a corresponding Flink data type from a debezium {@link Column}. */ public static DataType fromDbzColumn(Column column) { DataType dataType = convertFromColumn(column); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java index b2292a825..e389a5128 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.sqlserver.source.utils; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import org.apache.flink.cdc.connectors.base.utils.ObjectUtils; import org.apache.flink.cdc.connectors.sqlserver.source.offset.LsnOffset; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.RowType; @@ -40,14 +41,17 @@ import org.apache.kafka.connect.source.SourceRecord; import javax.annotation.Nullable; +import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.stream.Collectors; import static org.apache.flink.table.api.DataTypes.FIELD; @@ -297,8 +301,7 @@ public class SqlServerUtils { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); } else { - final String orderBy = - pkRowType.getFieldNames().stream().collect(Collectors.joining(", ")); + final String orderBy = String.join(", ", pkRowType.getFieldNames()); return buildSelectWithBoundaryRowLimits( tableId, limitSize, @@ -322,7 +325,7 @@ public class SqlServerUtils { StringBuilder sql = new StringBuilder(); for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator(); fieldNamesIt.hasNext(); ) { - sql.append("MAX(" + fieldNamesIt.next() + ")"); + sql.append("MAX(").append(fieldNamesIt.next()).append(")"); if (fieldNamesIt.hasNext()) { sql.append(" , "); } @@ -342,12 +345,8 @@ public class SqlServerUtils { } sql.append(projection).append(" FROM "); sql.append(quoteSchemaAndTable(tableId)); - if (condition.isPresent()) { - sql.append(" WHERE ").append(condition.get()); - } - if (orderBy.isPresent()) { - sql.append(" ORDER BY ").append(orderBy.get()); - } + condition.ifPresent(s -> sql.append(" WHERE ").append(s)); + orderBy.ifPresent(s -> sql.append(" ORDER BY ").append(s)); return sql.toString(); } @@ -396,11 +395,54 @@ public class SqlServerUtils { sql.append(projection); sql.append(" FROM "); sql.append(quoteSchemaAndTable(tableId)); - if (condition.isPresent()) { - sql.append(" WHERE ").append(condition.get()); - } + condition.ifPresent(s -> sql.append(" WHERE ").append(s)); sql.append(" ORDER BY ").append(orderBy); sql.append(") T"); return sql.toString(); } + + public static int compare(Object obj1, Object obj2, Column splitColumn) { + if (splitColumn.typeName().equals(SqlServerTypeUtils.UNIQUEIDENTIFIRER)) { + return new SQLServerUUIDComparator() + .compare(UUID.fromString(obj1.toString()), UUID.fromString(obj2.toString())); + } + return ObjectUtils.compare(obj1, obj2); + } + + /** + * Comparator for SQL Server UUIDs. SQL Server compares UUIDs in a different order than Java. + * Reference code: SQLGuid.cs::CompareTo + * Reference doc: Comparing + * GUID and uniqueidentifier values + */ + static class SQLServerUUIDComparator implements Comparator { + + private static final int SIZE_OF_GUID = 16; + private static final byte[] GUID_ORDER = { + 10, 11, 12, 13, 14, 15, 8, 9, 6, 7, 4, 5, 0, 1, 2, 3 + }; + + public int compare(UUID uuid1, UUID uuid2) { + byte[] bytes1 = uuidToBytes(uuid1); + byte[] bytes2 = uuidToBytes(uuid2); + + for (int i = 0; i < SIZE_OF_GUID; i++) { + byte b1 = bytes1[GUID_ORDER[i]]; + byte b2 = bytes2[GUID_ORDER[i]]; + if (b1 != b2) { + return (b1 & 0xFF) - (b2 & 0xFF); // Unsigned byte comparison + } + } + return 0; + } + + private byte[] uuidToBytes(UUID uuid) { + ByteBuffer bb = ByteBuffer.wrap(new byte[16]); + bb.putLong(uuid.getMostSignificantBits()); + bb.putLong(uuid.getLeastSignificantBits()); + return bb.array(); + } + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SQLServerUUIDComparatorTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SQLServerUUIDComparatorTest.java new file mode 100644 index 000000000..89a292777 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SQLServerUUIDComparatorTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.sqlserver.source.utils; + +import org.apache.flink.cdc.connectors.base.utils.ObjectUtils; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Unit test for {@link SqlServerUtils.SQLServerUUIDComparator}. * */ +public class SQLServerUUIDComparatorTest { + + @Test + public void testComparator() { + SqlServerUtils.SQLServerUUIDComparator comparator = + new SqlServerUtils.SQLServerUUIDComparator(); + // Create an ArrayList and fill it with Guid values. + List guidList = new ArrayList<>(); + guidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE")); + guidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE")); + guidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE")); + + // Sort the Guids. + guidList.sort(ObjectUtils::compare); + + assertEquals( + guidList.get(0).toString().toUpperCase(), "1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"); + assertEquals( + guidList.get(1).toString().toUpperCase(), "2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"); + assertEquals( + guidList.get(2).toString().toUpperCase(), "3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"); + + // Create an ArrayList of SqlGuids. + List sqlGuidList = new ArrayList<>(); + sqlGuidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE")); + sqlGuidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE")); + sqlGuidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE")); + + // Sort the SqlGuids. The unsorted SqlGuids are in the same order + // as the unsorted Guid values. + sqlGuidList.sort(comparator); + + // Display the sorted SqlGuids. The sorted SqlGuid values are ordered + // differently than the Guid values. + assertEquals( + sqlGuidList.get(0).toString().toUpperCase(), + "2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"); + assertEquals( + sqlGuidList.get(1).toString().toUpperCase(), + "3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"); + assertEquals( + sqlGuidList.get(2).toString().toUpperCase(), + "1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"); + } +}