|
|
|
@ -356,6 +356,10 @@ public class OracleConnectorITCase {
|
|
|
|
|
public void testConsumingAllEventsByChunkKeyColumn() throws Exception {
|
|
|
|
|
|
|
|
|
|
createAndInitialize("product.sql");
|
|
|
|
|
try (Connection dbaConnection = getJdbcConnectionAsDBA();
|
|
|
|
|
Statement dbaStatement = dbaConnection.createStatement()) {
|
|
|
|
|
dbaStatement.execute("GRANT ANALYZE ANY TO " + ORACLE_CONTAINER.getUsername());
|
|
|
|
|
}
|
|
|
|
|
if (!parallelismSnapshot) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -383,8 +387,8 @@ public class OracleConnectorITCase {
|
|
|
|
|
+ ")",
|
|
|
|
|
ORACLE_CONTAINER.getHost(),
|
|
|
|
|
ORACLE_CONTAINER.getOraclePort(),
|
|
|
|
|
"dbzuser",
|
|
|
|
|
"dbz",
|
|
|
|
|
ORACLE_CONTAINER.getUsername(),
|
|
|
|
|
ORACLE_CONTAINER.getPassword(),
|
|
|
|
|
parallelismSnapshot,
|
|
|
|
|
"debezium",
|
|
|
|
|
"products");
|
|
|
|
@ -985,4 +989,70 @@ public class OracleConnectorITCase {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testCompositePkTableSplitsUnevenlyWithChunkKeyColumn() throws Exception {
|
|
|
|
|
if (parallelismSnapshot) {
|
|
|
|
|
testUseChunkColumn("PRODUCT_KIND");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testCompositePkTableSplitsEvenlyWithChunkKeyColumn() throws Exception {
|
|
|
|
|
if (parallelismSnapshot) {
|
|
|
|
|
testUseChunkColumn("PRODUCT_NO");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void testUseChunkColumn(String chunkColumn) throws Exception {
|
|
|
|
|
createAndInitialize("customer.sql");
|
|
|
|
|
try (Connection dbaConnection = getJdbcConnectionAsDBA();
|
|
|
|
|
Statement dbaStatement = dbaConnection.createStatement()) {
|
|
|
|
|
dbaStatement.execute("GRANT ANALYZE ANY TO " + ORACLE_CONTAINER.getUsername());
|
|
|
|
|
}
|
|
|
|
|
String sourceDDL =
|
|
|
|
|
String.format(
|
|
|
|
|
"CREATE TABLE evenly_shopping_cart (\n"
|
|
|
|
|
+ " PRODUCT_NO INT NOT NULL,\n"
|
|
|
|
|
+ " PRODUCT_KIND VARCHAR(255),\n"
|
|
|
|
|
+ " USER_ID VARCHAR(255) NOT NULL,\n"
|
|
|
|
|
+ " DESCRIPTION VARCHAR(255) NOT NULL\n"
|
|
|
|
|
+ ") WITH ("
|
|
|
|
|
+ " 'connector' = 'oracle-cdc',"
|
|
|
|
|
+ " 'hostname' = '%s',"
|
|
|
|
|
+ " 'port' = '%s',"
|
|
|
|
|
+ " 'username' = '%s',"
|
|
|
|
|
+ " 'password' = '%s',"
|
|
|
|
|
+ " 'scan.incremental.snapshot.enabled' = '%s',"
|
|
|
|
|
+ " 'scan.incremental.snapshot.chunk.key-column' = '%s',"
|
|
|
|
|
+ " 'scan.incremental.snapshot.chunk.size' = '%s',"
|
|
|
|
|
+ " 'database-name' = '%s',"
|
|
|
|
|
+ " 'schema-name' = '%s',"
|
|
|
|
|
+ " 'table-name' = '%s'"
|
|
|
|
|
+ ")",
|
|
|
|
|
ORACLE_CONTAINER.getHost(),
|
|
|
|
|
ORACLE_CONTAINER.getOraclePort(),
|
|
|
|
|
ORACLE_CONTAINER.getUsername(),
|
|
|
|
|
ORACLE_CONTAINER.getPassword(),
|
|
|
|
|
parallelismSnapshot,
|
|
|
|
|
chunkColumn,
|
|
|
|
|
4,
|
|
|
|
|
"ORCLCDB",
|
|
|
|
|
"DEBEZIUM",
|
|
|
|
|
"EVENLY_SHOPPING_CART");
|
|
|
|
|
String sinkDDL =
|
|
|
|
|
"CREATE TABLE sink "
|
|
|
|
|
+ " WITH ("
|
|
|
|
|
+ " 'connector' = 'values',"
|
|
|
|
|
+ " 'sink-insert-only' = 'false'"
|
|
|
|
|
+ ") LIKE evenly_shopping_cart (EXCLUDING OPTIONS)";
|
|
|
|
|
|
|
|
|
|
tEnv.executeSql(sourceDDL);
|
|
|
|
|
tEnv.executeSql(sinkDDL);
|
|
|
|
|
|
|
|
|
|
// async submit job
|
|
|
|
|
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM evenly_shopping_cart");
|
|
|
|
|
waitForSinkSize("sink", 12);
|
|
|
|
|
result.getJobClient().get().cancel().get();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|