@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData ;
import org.apache.flink.types.Row ;
import org.apache.flink.util.CloseableIterator ;
import org.apache.flink.util.ExceptionUtils ;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig ;
import com.ververica.cdc.connectors.base.options.StartupOptions ;
@ -60,6 +61,7 @@ import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING ;
import static org.apache.flink.table.catalog.Column.physical ;
import static org.apache.flink.util.Preconditions.checkState ;
import static org.junit.Assert.assertTrue ;
/** IT tests for {@link OracleSourceBuilder.OracleIncrementalSource}. */
public class OracleSourceITCase extends OracleSourceTestBase {
@ -127,7 +129,9 @@ public class OracleSourceITCase extends OracleSourceTestBase {
FailoverType . TM ,
FailoverPhase . SNAPSHOT ,
new String [ ] { "CUSTOMERS" } ,
true ) ;
true ,
RestartStrategies . fixedDelayRestart ( 1 , 0 ) ,
null ) ;
}
@Test
@ -302,6 +306,29 @@ public class OracleSourceITCase extends OracleSourceTestBase {
assertEqualsInAnyOrder ( expectedRecords , records ) ;
}
@Test
public void testTableWithChunkColumnOfNoPrimaryKey ( ) {
String chunkColumn = "NAME" ;
try {
testOracleParallelSource (
1 ,
FailoverType . NONE ,
FailoverPhase . NEVER ,
new String [ ] { "CUSTOMERS" } ,
false ,
RestartStrategies . noRestart ( ) ,
chunkColumn ) ;
} catch ( Exception e ) {
assertTrue (
ExceptionUtils . findThrowableWithMessage (
e ,
String . format (
"Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s." ,
chunkColumn , "ID" , "customer.DEBEZIUM.CUSTOMERS" ) )
. isPresent ( ) ) ;
}
}
private List < String > testBackfillWhenWritingEvents (
boolean skipSnapshotBackfill , int fetchSize , int hookType ) throws Exception {
createAndInitialize ( "customer.sql" ) ;
@ -400,7 +427,13 @@ public class OracleSourceITCase extends OracleSourceTestBase {
String [ ] captureCustomerTables )
throws Exception {
testOracleParallelSource (
parallelism , failoverType , failoverPhase , captureCustomerTables , false ) ;
parallelism ,
failoverType ,
failoverPhase ,
captureCustomerTables ,
false ,
RestartStrategies . fixedDelayRestart ( 1 , 0 ) ,
null ) ;
}
private void testOracleParallelSource (
@ -408,7 +441,9 @@ public class OracleSourceITCase extends OracleSourceTestBase {
FailoverType failoverType ,
FailoverPhase failoverPhase ,
String [ ] captureCustomerTables ,
boolean skipSnapshotBackfill )
boolean skipSnapshotBackfill ,
RestartStrategies . RestartStrategyConfiguration restartStrategyConfiguration ,
String chunkColumn )
throws Exception {
createAndInitialize ( "customer.sql" ) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ;
@ -416,7 +451,7 @@ public class OracleSourceITCase extends OracleSourceTestBase {
env . setParallelism ( parallelism ) ;
env . enableCheckpointing ( 200L ) ;
env . setRestartStrategy ( RestartStrategies. fixedDelayRestart ( 1 , 0 ) ) ;
env . setRestartStrategy ( restartStrategyConfiguration ) ;
String sourceDDL =
format (
@ -439,6 +474,7 @@ public class OracleSourceITCase extends OracleSourceTestBase {
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ "%s"
+ ")" ,
ORACLE_CONTAINER . getHost ( ) ,
ORACLE_CONTAINER . getOraclePort ( ) ,
@ -447,7 +483,12 @@ public class OracleSourceITCase extends OracleSourceTestBase {
ORACLE_DATABASE ,
ORACLE_SCHEMA ,
getTableNameRegex ( captureCustomerTables ) , // (customer|customer_1)
skipSnapshotBackfill ) ;
skipSnapshotBackfill ,
chunkColumn = = null
? ""
: ",'scan.incremental.snapshot.chunk.key-column'='"
+ chunkColumn
+ "'" ) ;
// first step: check the snapshot data
String [ ] snapshotForSingleTable =