[FLINK-34883] Fix postgres uuid column as PK (#3282)

* [FLINK-34883] Fix postgres uuid column as PK

* [FLINK-34883] Fix column comment.
pull/3478/head
Joao Boto 7 months ago committed by GitHub
parent 11deb62637
commit 94a0415475
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -47,11 +47,10 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @return maximum and minimum value.
*/
Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException;
Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) throws SQLException;
/**
* Query the minimum value of the column in the table, and the minimum value must greater than
@ -60,12 +59,11 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @param excludedLowerBound the minimum value should be greater than this value.
* @return minimum value.
*/
Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
Object queryMin(JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException;
/**
@ -75,7 +73,7 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @param chunkSize chunk size.
* @param includedLowerBound the previous chunk end value.
* @return next chunk end value.
@ -83,7 +81,7 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException;

@ -110,28 +110,28 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
}
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return MySqlUtils.queryMinMax(jdbc, tableId, columnName);
return MySqlUtils.queryMinMax(jdbc, tableId, column.name());
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return MySqlUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return MySqlUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}
@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return MySqlUtils.queryNextChunkMax(
jdbc, tableId, columnName, chunkSize, includedLowerBound);
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
}
@Override
@ -161,8 +161,7 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
@ -189,11 +188,10 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
@ -241,7 +239,7 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
@ -250,7 +248,7 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
@ -258,7 +256,7 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
@ -269,17 +267,17 @@ public class MySqlChunkSplitter implements JdbcSourceChunkSplitter {
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;

@ -127,16 +127,16 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
}
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return Db2Utils.queryMinMax(jdbc, tableId, columnName);
return Db2Utils.queryMinMax(jdbc, tableId, column.name());
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return Db2Utils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return Db2Utils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}
@Override
@ -152,11 +152,12 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return Db2Utils.queryNextChunkMax(jdbc, tableId, columnName, chunkSize, includedLowerBound);
return Db2Utils.queryNextChunkMax(
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
}
@Override
@ -177,8 +178,7 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
@ -205,11 +205,10 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
@ -259,7 +258,7 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
@ -268,7 +267,7 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
@ -276,7 +275,7 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
// may sleep awhile to avoid DDOS on Db2 server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
@ -287,17 +286,17 @@ public class Db2ChunkSplitter implements JdbcSourceChunkSplitter {
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;

@ -114,28 +114,28 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
}
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return OracleUtils.queryMinMax(jdbc, tableId, columnName);
return OracleUtils.queryMinMax(jdbc, tableId, column.name());
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return OracleUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return OracleUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}
@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return OracleUtils.queryNextChunkMax(
jdbc, tableId, columnName, chunkSize, includedLowerBound);
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
}
@Override
@ -165,8 +165,7 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
@ -180,7 +179,7 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
// use ROWID get splitUnevenlySizedChunks by default
if (splitColumn.name().equals(ROWID.class.getSimpleName())) {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
if (isEvenlySplitColumn(splitColumn)) {
@ -198,11 +197,10 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
@ -239,7 +237,7 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
@ -248,7 +246,7 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
@ -257,7 +255,7 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
@ -294,17 +292,17 @@ public class OracleChunkSplitter implements JdbcSourceChunkSplitter {
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (isChunkEndGeMax(chunkEnd, max)) {
return null;

@ -73,7 +73,7 @@ limitations under the License.
<!-- fix CVE-2022-26520 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520 -->
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.1</version>
<version>42.7.3</version>
</dependency>
<!-- test dependencies on Debezium -->

@ -127,28 +127,28 @@ public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
}
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column splitColumn)
throws SQLException {
return PostgresQueryUtils.queryMinMax(jdbc, tableId, columnName);
return PostgresQueryUtils.queryMinMax(jdbc, tableId, splitColumn);
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return PostgresQueryUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return PostgresQueryUtils.queryMin(jdbc, tableId, column, excludedLowerBound);
}
@Override
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return PostgresQueryUtils.queryNextChunkMax(
jdbc, tableId, columnName, chunkSize, includedLowerBound);
jdbc, tableId, column, chunkSize, includedLowerBound);
}
// --------------------------------------------------------------------------------------------
@ -179,8 +179,7 @@ public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
@ -207,11 +206,10 @@ public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
@ -261,7 +259,7 @@ public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
@ -270,7 +268,7 @@ public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
@ -278,7 +276,7 @@ public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
// may sleep a while to avoid DDOS on PostgreSQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
@ -289,17 +287,17 @@ public class PostgresChunkSplitter implements JdbcSourceChunkSplitter {
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;

@ -53,7 +53,9 @@ import org.slf4j.LoggerFactory;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady;
import static io.debezium.connector.postgresql.Utils.refreshSchema;
@ -280,12 +282,18 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
snapshotSplit.splitId(),
table.id());
List<String> uuidFields =
snapshotSplit.getSplitKeyType().getFieldNames().stream()
.filter(field -> table.columnWithName(field).typeName().equals("uuid"))
.collect(Collectors.toList());
final String selectSql =
PostgresQueryUtils.buildSplitScanQuery(
snapshotSplit.getTableId(),
snapshotSplit.getSplitKeyType(),
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null);
snapshotSplit.getSplitEnd() == null,
uuidFields);
LOG.debug(
"For split '{}' of table {} using select statement: '{}'",
snapshotSplit.splitId(),

@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.postgres.source.utils;
import org.apache.flink.table.types.logical.RowType;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.TableId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,7 +28,9 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@ -40,12 +43,12 @@ public class PostgresQueryUtils {
private PostgresQueryUtils() {}
public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
final String minMaxQuery =
String.format(
"SELECT MIN(%s), MAX(%s) FROM %s",
quote(columnName), quote(columnName), quote(tableId));
quoteForMinMax(column), quoteForMinMax(column), quote(tableId));
return jdbc.queryAndMap(
minMaxQuery,
rs -> {
@ -85,12 +88,15 @@ public class PostgresQueryUtils {
}
public static Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
final String query =
String.format(
"SELECT MIN(%s) FROM %s WHERE %s > ?",
quote(columnName), quote(tableId), quote(columnName));
"SELECT MIN(%s) FROM %s WHERE %s > %s",
quoteForMinMax(column),
quote(tableId),
quote(column.name()),
castParam(column));
return jdbc.prepareQueryAndMap(
query,
ps -> ps.setObject(1, excludedLowerBound),
@ -109,20 +115,21 @@ public class PostgresQueryUtils {
public static Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException {
String quotedColumn = quote(splitColumnName);
String quotedColumn = quote(splitColumn.name());
String query =
String.format(
"SELECT MAX(%s) FROM ("
+ "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s"
+ "SELECT %s FROM %s WHERE %s >= %s ORDER BY %s ASC LIMIT %s"
+ ") AS T",
quotedColumn,
quoteForMinMax(splitColumn),
quotedColumn,
quote(tableId),
quotedColumn,
castParam(splitColumn),
quotedColumn,
chunkSize);
return jdbc.prepareQueryAndMap(
@ -141,7 +148,17 @@ public class PostgresQueryUtils {
public static String buildSplitScanQuery(
TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) {
return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true);
return buildSplitScanQuery(
tableId, pkRowType, isFirstSplit, isLastSplit, new ArrayList<>());
}
public static String buildSplitScanQuery(
TableId tableId,
RowType pkRowType,
boolean isFirstSplit,
boolean isLastSplit,
List<String> uuidFields) {
return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, uuidFields, -1, true);
}
private static String buildSplitQuery(
@ -149,6 +166,7 @@ public class PostgresQueryUtils {
RowType pkRowType,
boolean isFirstSplit,
boolean isLastSplit,
List<String> uuidFields,
int limitSize,
boolean isScanningData) {
final String condition;
@ -157,27 +175,27 @@ public class PostgresQueryUtils {
condition = null;
} else if (isFirstSplit) {
final StringBuilder sql = new StringBuilder();
addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", uuidFields);
if (isScanningData) {
sql.append(" AND NOT (");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
sql.append(")");
}
condition = sql.toString();
} else if (isLastSplit) {
final StringBuilder sql = new StringBuilder();
addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ", uuidFields);
condition = sql.toString();
} else {
final StringBuilder sql = new StringBuilder();
addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ", uuidFields);
if (isScanningData) {
sql.append(" AND NOT (");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ", uuidFields);
sql.append(")");
}
sql.append(" AND ");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?");
addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ", uuidFields);
condition = sql.toString();
}
@ -237,6 +255,31 @@ public class PostgresQueryUtils {
return "\"" + dbOrTableName + "\"";
}
private static String quoteForMinMax(Column column) {
String quoteColumn = quote(column.name());
return isUUID(column) ? castToText(quoteColumn) : quoteColumn;
}
private static String castParam(Column column) {
return castParam(isUUID(column));
}
private static String castParam(boolean isUUID) {
return isUUID ? castToUuid("?") : "?";
}
private static String castToUuid(String value) {
return String.format("(%s)::uuid", value);
}
private static String castToText(String value) {
return String.format("(%s)::text", value);
}
private static boolean isUUID(Column column) {
return column.typeName().equals("uuid");
}
public static String quote(TableId tableId) {
return tableId.toQuotedString('"');
}
@ -251,10 +294,12 @@ public class PostgresQueryUtils {
}
private static void addPrimaryKeyColumnsToCondition(
RowType pkRowType, StringBuilder sql, String predicate) {
RowType pkRowType, StringBuilder sql, String predicate, List<String> uuidFields) {
for (Iterator<String> fieldNamesIt = pkRowType.getFieldNames().iterator();
fieldNamesIt.hasNext(); ) {
sql.append(fieldNamesIt.next()).append(predicate);
String fieldName = fieldNamesIt.next();
boolean isUUID = uuidFields.contains(fieldName);
sql.append(fieldName).append(predicate).append(castParam(isUUID));
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}

@ -60,6 +60,7 @@ public class PostgresTypeUtils {
private static final String PG_CHARACTER_ARRAY = "_character";
private static final String PG_CHARACTER_VARYING = "varchar";
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
private static final String PG_UUID = "uuid";
/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
@ -136,6 +137,7 @@ public class PostgresTypeUtils {
case PG_CHARACTER_VARYING_ARRAY:
return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
case PG_TEXT:
case PG_UUID:
return DataTypes.STRING();
case PG_TEXT_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());

@ -129,16 +129,16 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
}
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return SqlServerUtils.queryMinMax(jdbc, tableId, columnName);
return SqlServerUtils.queryMinMax(jdbc, tableId, column.name());
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return SqlServerUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound);
return SqlServerUtils.queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}
@Override
@ -154,12 +154,12 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
public Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException {
return SqlServerUtils.queryNextChunkMax(
jdbc, tableId, columnName, chunkSize, includedLowerBound);
jdbc, tableId, column.name(), chunkSize, includedLowerBound);
}
@Override
@ -180,8 +180,7 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
*/
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
@ -208,11 +207,10 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
return splitEvenlySizedChunks(
tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
@ -262,7 +260,7 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
private List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
@ -271,7 +269,7 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
@ -279,7 +277,7 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
// may sleep awhile to avoid DDOS on SqlServer server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
@ -290,17 +288,17 @@ public class SqlServerChunkSplitter implements JdbcSourceChunkSplitter {
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
return null;

@ -36,7 +36,7 @@ limitations under the License.
<jdbc.version-1.18>3.1.2-1.18</jdbc.version-1.18>
<jdbc.version-1.19>3.1.2-1.18</jdbc.version-1.19>
<mysql.driver.version>8.0.27</mysql.driver.version>
<postgresql.driver.version>42.5.1</postgresql.driver.version>
<postgresql.driver.version>42.7.3</postgresql.driver.version>
</properties>
<dependencies>

Loading…
Cancel
Save