[FLINK-36793][cdc-source-connectors] Fix the problem with the block splitter logic of Oracle CDC incremental snapshot, causing the split slice to be too large

pull/3841/head
linjc13 3 weeks ago
parent 98f9f15132
commit 1f912900e6

@ -261,12 +261,16 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
}
/** ChunkEnd less than or equal to max. */
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
protected boolean isChunkEndLeMax(
JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn)
throws SQLException {
return ObjectUtils.compare(chunkEnd, max) <= 0;
}
/** ChunkEnd greater than or equal to max. */
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
protected boolean isChunkEndGeMax(
JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn)
throws SQLException {
return ObjectUtils.compare(chunkEnd, max) >= 0;
}
@ -389,7 +393,8 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
chunkSize);
// may sleep a while to avoid DDOS on MySQL server
maySleep(nextChunkId, tableId);
if (chunkEnd != null && isChunkEndLeMax(chunkEnd, minMaxOfSplitColumn[1], splitColumn)) {
if (chunkEnd != null
&& isChunkEndLeMax(jdbcConnection, chunkEnd, minMaxOfSplitColumn[1], splitColumn)) {
nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
return createSnapshotSplit(tableId, nextChunkId++, splitType, chunkStartVal, chunkEnd);
} else {
@ -489,7 +494,7 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) {
while (chunkEnd != null && isChunkEndLeMax(jdbcConnection, chunkEnd, max, splitColumn)) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on PostgreSQL server
@ -518,7 +523,7 @@ public abstract class JdbcSourceChunkSplitter implements ChunkSplitter {
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (isChunkEndGeMax(chunkEnd, max, splitColumn)) {
if (isChunkEndGeMax(jdbc, chunkEnd, max, splitColumn)) {
return null;
} else {
return chunkEnd;

@ -106,12 +106,29 @@ public class OracleChunkSplitter extends JdbcSourceChunkSplitter {
/** ChunkEnd less than or equal to max. */
@Override
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
protected boolean isChunkEndLeMax(
JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn)
throws SQLException {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID) max).getBytes())
<= 0;
String query =
String.format(
"SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS ASC");
return jdbc.prepareQueryAndMap(
query,
ps -> {
ps.setObject(1, chunkEnd.toString());
ps.setObject(2, max.toString());
},
rs -> {
if (rs.next()) {
Object obj = rs.getObject(1);
return obj.toString().equals(chunkEnd.toString())
|| chunkEnd.toString().equals(max.toString());
} else {
throw new RuntimeException("compare rowid error");
}
});
} else {
chunkEndMaxCompare = chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0;
}
@ -120,12 +137,29 @@ public class OracleChunkSplitter extends JdbcSourceChunkSplitter {
/** ChunkEnd greater than or equal to max. */
@Override
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
protected boolean isChunkEndGeMax(
JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn)
throws SQLException {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID) max).getBytes())
>= 0;
String query =
String.format(
"SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS DESC");
return jdbc.prepareQueryAndMap(
query,
ps -> {
ps.setObject(1, chunkEnd.toString());
ps.setObject(2, max.toString());
},
rs -> {
if (rs.next()) {
Object obj = rs.getObject(1);
return obj.toString().equals(chunkEnd.toString())
|| chunkEnd.toString().equals(max.toString());
} else {
throw new RuntimeException("compare rowid error");
}
});
} else {
chunkEndMaxCompare = chunkEnd != null && ObjectUtils.compare(chunkEnd, max) >= 0;
}

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter;
import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import oracle.sql.ROWID;
import org.junit.jupiter.api.Test;
import java.sql.SQLException;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** IT tests to cover tables chunk splitter process. */
class OracleChunkSplitterTest extends OracleSourceTestBase {
@Test
void testIsChunkEndGeMax_Rowid_Case() throws SQLException {
String a = "AAAzIdACKAAABWCAAA";
String b = "AAAzIdAC/AACWIPAAB";
rowidGeMaxCheck(a, b, true);
}
@Test
void testIsChunkEndLeMax_Rowid_Case() throws SQLException {
String a = "AAAzIdACKAAABWCAAA";
String b = "AAAzIdAC/AACWIPAAB";
rowidLeMaxCheck(a, b, true);
}
private void rowidGeMaxCheck(String chunkEndStr, String maxStr, boolean expected)
throws SQLException {
JdbcConfiguration jdbcConfig =
JdbcConfiguration.create()
.with(JdbcConfiguration.HOSTNAME, ORACLE_CONTAINER.getHost())
.with(JdbcConfiguration.PORT, ORACLE_CONTAINER.getOraclePort())
.with(JdbcConfiguration.USER, ORACLE_CONTAINER.getUsername())
.with(JdbcConfiguration.PASSWORD, ORACLE_CONTAINER.getPassword())
.with(JdbcConfiguration.DATABASE, ORACLE_CONTAINER.getDatabaseName())
.build();
JdbcConnection jdbc = OracleConnectionUtils.createOracleConnection(jdbcConfig);
ROWID chunkEnd = new ROWID(chunkEndStr);
ROWID max = new ROWID(maxStr);
ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null, null, null);
OracleChunkSplitter splitter = new OracleChunkSplitter(null, null, chunkSplitterState);
assertTrue(splitter.isChunkEndGeMax(jdbc, chunkEnd, max, null) == expected);
}
private void rowidLeMaxCheck(String chunkEndStr, String maxStr, boolean expected)
throws SQLException {
JdbcConfiguration jdbcConfig =
JdbcConfiguration.create()
.with(JdbcConfiguration.HOSTNAME, ORACLE_CONTAINER.getHost())
.with(JdbcConfiguration.PORT, ORACLE_CONTAINER.getOraclePort())
.with(JdbcConfiguration.USER, ORACLE_CONTAINER.getUsername())
.with(JdbcConfiguration.PASSWORD, ORACLE_CONTAINER.getPassword())
.with(JdbcConfiguration.DATABASE, ORACLE_CONTAINER.getDatabaseName())
.build();
JdbcConnection jdbc = OracleConnectionUtils.createOracleConnection(jdbcConfig);
ROWID chunkEnd = new ROWID(chunkEndStr);
ROWID max = new ROWID(maxStr);
ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null, null, null);
OracleChunkSplitter splitter = new OracleChunkSplitter(null, null, chunkSplitterState);
assertTrue(splitter.isChunkEndLeMax(jdbc, chunkEnd, max, null) == expected);
}
}

@ -69,12 +69,14 @@ public class SqlServerChunkSplitter extends JdbcSourceChunkSplitter {
return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
}
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
protected boolean isChunkEndLeMax(
JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0;
}
/** ChunkEnd greater than or equal to max. */
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
protected boolean isChunkEndGeMax(
JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0;
}
}

Loading…
Cancel
Save