[FLINK-34990][cdc-connector][oracle] Oracle cdc support newly add table (#3203)

* [cdc-connector][oracle] Oracle cdc support newly add table

* [cdc-connector][oracle] Fix code style

* [cdc-connector][oracle] Address comment
pull/3475/head
Xin Gong 7 months ago committed by GitHub
parent 137dc1b2e6
commit 2d1eb0aff1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -243,6 +243,12 @@ public class OracleSourceBuilder<T> {
return this;
}
/** Whether the {@link OracleIncrementalSource} should scan the newly added tables or not. */
public OracleSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
return this;
}
/**
* Build the {@link OracleIncrementalSource}.
*

@ -63,7 +63,8 @@ public class OracleSourceConfig extends JdbcSourceConfig {
int connectMaxRetries,
int connectionPoolSize,
String chunkKeyColumn,
boolean skipSnapshotBackfill) {
boolean skipSnapshotBackfill,
boolean scanNewlyAddedTableEnabled) {
super(
startupOptions,
databaseList,
@ -89,7 +90,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
connectionPoolSize,
chunkKeyColumn,
skipSnapshotBackfill,
false);
scanNewlyAddedTableEnabled);
this.url = url;
}

@ -133,6 +133,7 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory {
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn,
skipSnapshotBackfill);
skipSnapshotBackfill,
scanNewlyAddedTableEnabled);
}
}

@ -79,6 +79,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
private final String chunkKeyColumn;
private final boolean closeIdleReaders;
private final boolean skipSnapshotBackfill;
private final boolean scanNewlyAddedTableEnabled;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -113,7 +114,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
double distributionFactorLower,
@Nullable String chunkKeyColumn,
boolean closeIdleReaders,
boolean skipSnapshotBackfill) {
boolean skipSnapshotBackfill,
boolean scanNewlyAddedTableEnabled) {
this.physicalSchema = physicalSchema;
this.url = url;
this.port = port;
@ -139,6 +141,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
this.chunkKeyColumn = chunkKeyColumn;
this.closeIdleReaders = closeIdleReaders;
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
}
@Override
@ -187,6 +190,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
.closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill)
.chunkKeyColumn(chunkKeyColumn)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.build();
return SourceProvider.of(oracleChangeEventSource);
@ -252,7 +256,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
distributionFactorLower,
chunkKeyColumn,
closeIdleReaders,
skipSnapshotBackfill);
skipSnapshotBackfill,
scanNewlyAddedTableEnabled);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@ -291,7 +296,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
&& Objects.equals(distributionFactorLower, that.distributionFactorLower)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders)
&& Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill);
&& Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill)
&& Objects.equals(scanNewlyAddedTableEnabled, that.scanNewlyAddedTableEnabled);
}
@Override
@ -321,7 +327,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
distributionFactorLower,
chunkKeyColumn,
closeIdleReaders,
skipSnapshotBackfill);
skipSnapshotBackfill,
scanNewlyAddedTableEnabled);
}
@Override

@ -47,6 +47,7 @@ import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_M
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@ -106,6 +107,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
@ -142,7 +144,8 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
distributionFactorLower,
chunkKeyColumn,
closeIdlerReaders,
skipSnapshotBackfill);
skipSnapshotBackfill,
scanNewlyAddedTableEnabled);
}
@Override
@ -180,6 +183,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
return options;
}

@ -0,0 +1,908 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oracle.source;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverPhase;
import org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverType;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.ExceptionUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.getTableNameRegex;
import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover;
import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForSinkSize;
import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForUpsertSinkSize;
/** IT tests to cover various newly added tables during capture process. */
public class NewlyAddedTableITCase extends OracleSourceTestBase {
@Rule public final Timeout timeoutPerTest = Timeout.seconds(600);
private final ScheduledExecutorService mockRedoLogExecutor =
Executors.newScheduledThreadPool(1);
@BeforeClass
public static void beforeClass() throws SQLException {
try (Connection dbaConnection = getJdbcConnectionAsDBA();
Statement dbaStatement = dbaConnection.createStatement()) {
dbaStatement.execute("ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
}
}
@Before
public void before() throws Exception {
TestValuesTableFactory.clearAllData();
createAndInitialize("customer.sql");
try (Connection connection = getJdbcConnection()) {
Statement statement = connection.createStatement();
connection.setAutoCommit(false);
// prepare initial data for given table
String tableId = ORACLE_SCHEMA + ".PRODUCE_LOG_TABLE";
statement.execute(
format(
"CREATE TABLE %s ( ID NUMBER(19), CNT NUMBER(19), PRIMARY KEY(ID))",
tableId));
statement.execute(format("INSERT INTO %s VALUES (0, 100)", tableId));
statement.execute(format("INSERT INTO %s VALUES (1, 101)", tableId));
statement.execute(format("INSERT INTO %s VALUES (2, 102)", tableId));
connection.commit();
// mock continuous redo log during the newly added table capturing process
mockRedoLogExecutor.schedule(
() -> {
try {
executeSql(format("UPDATE %s SET CNT = CNT +1 WHERE ID < 2", tableId));
} catch (Exception e) {
e.printStackTrace();
}
},
500,
TimeUnit.MICROSECONDS);
}
}
@After
public void after() throws Exception {
mockRedoLogExecutor.shutdown();
// sleep 1000ms to wait until connections are closed.
Thread.sleep(1000L);
}
@Test
public void testNewlyAddedTableForExistsPipelineOnce() throws Exception {
testNewlyAddedTableOneByOne(
1,
FailoverType.NONE,
FailoverPhase.NEVER,
false,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING");
}
@Test
public void testNewlyAddedTableForExistsPipelineOnceWithAheadRedoLog() throws Exception {
testNewlyAddedTableOneByOne(
1,
FailoverType.NONE,
FailoverPhase.NEVER,
true,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING");
}
@Test
public void testNewlyAddedTableForExistsPipelineTwice() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.NONE,
FailoverPhase.NEVER,
false,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI");
}
@Test
public void testNewlyAddedTableForExistsPipelineTwiceWithAheadRedoLog() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.NONE,
FailoverPhase.NEVER,
true,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI");
}
@Test
public void testNewlyAddedTableForExistsPipelineTwiceWithAheadRedoLogAndAutoCloseReader()
throws Exception {
Map<String, String> otherOptions = new HashMap<>();
otherOptions.put("scan.incremental.close-idle-reader.enabled", "true");
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
otherOptions,
FailoverType.NONE,
FailoverPhase.NEVER,
true,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI");
}
@Test
public void testNewlyAddedTableForExistsPipelineThrice() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.NONE,
FailoverPhase.NEVER,
false,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI",
"ADDRESS_SHENZHEN");
}
@Test
public void testNewlyAddedTableForExistsPipelineThriceWithAheadRedoLog() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.NONE,
FailoverPhase.NEVER,
true,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI",
"ADDRESS_SHENZHEN");
}
@Test
public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws Exception {
testNewlyAddedTableOneByOne(
1,
FailoverType.NONE,
FailoverPhase.NEVER,
false,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING");
}
@Test
public void testNewlyAddedTableForExistsPipelineSingleParallelismWithAheadRedoLog()
throws Exception {
testNewlyAddedTableOneByOne(
1,
FailoverType.NONE,
FailoverPhase.NEVER,
true,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING");
}
@Test
public void testJobManagerFailoverForNewlyAddedTable() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.JM,
FailoverPhase.SNAPSHOT,
false,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING");
}
@Test
public void testJobManagerFailoverForNewlyAddedTableWithAheadRedoLog() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.JM,
FailoverPhase.SNAPSHOT,
true,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING");
}
@Test
public void testTaskManagerFailoverForNewlyAddedTable() throws Exception {
testNewlyAddedTableOneByOne(
1,
FailoverType.TM,
FailoverPhase.REDO_LOG,
false,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING");
}
@Test
public void testTaskManagerFailoverForNewlyAddedTableWithAheadRedoLog() throws Exception {
testNewlyAddedTableOneByOne(
1,
FailoverType.TM,
FailoverPhase.REDO_LOG,
false,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING");
}
@Test
public void testJobManagerFailoverForRemoveTableSingleParallelism() throws Exception {
testRemoveTablesOneByOne(
1,
FailoverType.JM,
FailoverPhase.SNAPSHOT,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI");
}
@Test
public void testJobManagerFailoverForRemoveTable() throws Exception {
testRemoveTablesOneByOne(
DEFAULT_PARALLELISM,
FailoverType.JM,
FailoverPhase.SNAPSHOT,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI");
}
@Test
public void testTaskManagerFailoverForRemoveTableSingleParallelism() throws Exception {
testRemoveTablesOneByOne(
1,
FailoverType.TM,
FailoverPhase.SNAPSHOT,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI");
}
@Test
public void testTaskManagerFailoverForRemoveTable() throws Exception {
testRemoveTablesOneByOne(
DEFAULT_PARALLELISM,
FailoverType.TM,
FailoverPhase.SNAPSHOT,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI");
}
@Test
public void testRemoveTableSingleParallelism() throws Exception {
testRemoveTablesOneByOne(
1,
FailoverType.NONE,
FailoverPhase.NEVER,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI");
}
@Test
public void testRemoveTable() throws Exception {
testRemoveTablesOneByOne(
DEFAULT_PARALLELISM,
FailoverType.NONE,
FailoverPhase.NEVER,
"ADDRESS_HANGZHOU",
"ADDRESS_BEIJING",
"ADDRESS_SHANGHAI");
}
@Test
public void testRemoveAndAddTablesOneByOne() throws Exception {
testRemoveAndAddTablesOneByOne(
1, "ADDRESS_HANGZHOU", "ADDRESS_BEIJING", "ADDRESS_SHANGHAI");
}
private void testRemoveAndAddTablesOneByOne(int parallelism, String... captureAddressTables)
throws Exception {
Connection connection = getJdbcConnection();
// step 1: create tables with all tables included
initialAddressTables(connection, captureAddressTables);
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
// get all expected data
List<String> fetchedDataList = new ArrayList<>();
String finishedSavePointPath = null;
// test removing and adding table one by one
for (int round = 0; round < captureAddressTables.length; round++) {
String captureTableThisRound = captureAddressTables[round];
String cityName = captureTableThisRound.split("_")[1];
StreamExecutionEnvironment env =
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement =
getCreateTableStatement(new HashMap<>(), captureTableThisRound);
tEnv.executeSql(createTableStatement);
tEnv.executeSql(
"CREATE TABLE sink ("
+ " TABLE_NAME STRING,"
+ " ID BIGINT,"
+ " COUNTRY STRING,"
+ " CITY STRING,"
+ " DETAIL_ADDRESS STRING,"
+ " primary key (CITY, ID) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")");
TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
JobClient jobClient = tableResult.getJobClient().get();
// this round's snapshot data
fetchedDataList.addAll(
Arrays.asList(
format(
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
captureTableThisRound, cityName, cityName),
format(
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
captureTableThisRound, cityName, cityName),
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
captureTableThisRound, cityName, cityName)));
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
// step 2: make redo log data for all tables before this round(also includes this
// round),
// test whether only this round table's data is captured.
for (int i = 0; i <= round; i++) {
String tableName = captureAddressTables[i];
makeRedoLogForAddressTableInRound(tableName, round);
}
// this round's redo log data
fetchedDataList.addAll(
Arrays.asList(
format(
"+U[%s, 416874195632735147, CHINA_%s, %s, %s West Town address 1]",
captureTableThisRound, round, cityName, cityName),
format(
"+I[%s, %d, China, %s, %s West Town address 4]",
captureTableThisRound,
417022095255614380L + round,
cityName,
cityName)));
// step 3: assert fetched redo log data in this round
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
// step 4: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
}
private void testRemoveTablesOneByOne(
int parallelism,
FailoverType failoverType,
FailoverPhase failoverPhase,
String... captureAddressTables)
throws Exception {
// step 1: create oracle tables with all tables included
initialAddressTables(getJdbcConnection(), captureAddressTables);
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
// get all expected data
List<String> fetchedDataList = new ArrayList<>();
for (String table : captureAddressTables) {
String cityName = table.split("_")[1];
fetchedDataList.addAll(
Arrays.asList(
format(
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
table, cityName, cityName),
format(
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
table, cityName, cityName),
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
table, cityName, cityName)));
}
String finishedSavePointPath = null;
// step 2: execute insert and trigger savepoint with all tables added
{
StreamExecutionEnvironment env =
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement =
getCreateTableStatement(new HashMap<>(), captureAddressTables);
tEnv.executeSql(createTableStatement);
tEnv.executeSql(
"CREATE TABLE sink ("
+ " TABLE_NAME STRING,"
+ " ID BIGINT,"
+ " COUNTRY STRING,"
+ " CITY STRING,"
+ " DETAIL_ADDRESS STRING,"
+ " primary key (CITY, ID) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")");
TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
JobClient jobClient = tableResult.getJobClient().get();
// trigger failover after some snapshot data read finished
if (failoverPhase == FailoverPhase.SNAPSHOT) {
triggerFailover(
failoverType,
jobClient.getJobID(),
miniClusterResource.getMiniCluster(),
() -> sleepMs(100));
}
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
// test removing table one by one, note that there should be at least one table remaining
for (int round = 0; round < captureAddressTables.length - 1; round++) {
String[] captureTablesThisRound =
Arrays.asList(captureAddressTables)
.subList(round + 1, captureAddressTables.length)
.toArray(new String[0]);
StreamExecutionEnvironment env =
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement =
getCreateTableStatement(new HashMap<>(), captureTablesThisRound);
tEnv.executeSql(createTableStatement);
tEnv.executeSql(
"CREATE TABLE sink ("
+ " TABLE_NAME STRING,"
+ " ID BIGINT,"
+ " COUNTRY STRING,"
+ " CITY STRING,"
+ " DETAIL_ADDRESS STRING,"
+ " primary key (CITY, ID) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")");
TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
JobClient jobClient = tableResult.getJobClient().get();
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
// step 3: make redo log data for all tables
List<String> expectedRedoLogDataThisRound = new ArrayList<>();
for (int i = 0, captureAddressTablesLength = captureAddressTables.length;
i < captureAddressTablesLength;
i++) {
String tableName = captureAddressTables[i];
makeRedoLogForAddressTableInRound(tableName, round);
if (i <= round) {
continue;
}
String cityName = tableName.split("_")[1];
expectedRedoLogDataThisRound.addAll(
Arrays.asList(
format(
"+U[%s, 416874195632735147, CHINA_%s, %s, %s West Town address 1]",
tableName, round, cityName, cityName),
format(
"+I[%s, %d, China, %s, %s West Town address 4]",
tableName,
417022095255614380L + round,
cityName,
cityName)));
}
if (failoverPhase == FailoverPhase.REDO_LOG
&& TestValuesTableFactory.getRawResults("sink").size()
> fetchedDataList.size()) {
triggerFailover(
failoverType,
jobClient.getJobID(),
miniClusterResource.getMiniCluster(),
() -> sleepMs(100));
}
fetchedDataList.addAll(expectedRedoLogDataThisRound);
// step 4: assert fetched redo log data in this round
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults("sink"));
// step 5: trigger savepoint
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
}
private void testNewlyAddedTableOneByOne(
int parallelism,
FailoverType failoverType,
FailoverPhase failoverPhase,
boolean makeRedoLogBeforeCapture,
String... captureAddressTables)
throws Exception {
testNewlyAddedTableOneByOne(
parallelism,
new HashMap<>(),
failoverType,
failoverPhase,
makeRedoLogBeforeCapture,
captureAddressTables);
}
private void testNewlyAddedTableOneByOne(
int parallelism,
Map<String, String> sourceOptions,
FailoverType failoverType,
FailoverPhase failoverPhase,
boolean makeRedoLogBeforeCapture,
String... captureAddressTables)
throws Exception {
// step 1: create oracle tables with initial data
initialAddressTables(getJdbcConnection(), captureAddressTables);
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
// test newly added table one by one
String finishedSavePointPath = null;
List<String> fetchedDataList = new ArrayList<>();
for (int round = 0; round < captureAddressTables.length; round++) {
String[] captureTablesThisRound =
Arrays.asList(captureAddressTables)
.subList(0, round + 1)
.toArray(new String[0]);
String newlyAddedTable = captureAddressTables[round];
if (makeRedoLogBeforeCapture) {
makeRedoLogBeforeCaptureForAddressTable(newlyAddedTable);
}
StreamExecutionEnvironment env =
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement =
getCreateTableStatement(sourceOptions, captureTablesThisRound);
tEnv.executeSql(createTableStatement);
tEnv.executeSql(
"CREATE TABLE sink ("
+ " TABLE_NAME STRING,"
+ " ID BIGINT,"
+ " COUNTRY STRING,"
+ " CITY STRING,"
+ " DETAIL_ADDRESS STRING,"
+ " primary key (CITY, ID) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")");
TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
JobClient jobClient = tableResult.getJobClient().get();
// step 2: assert fetched snapshot data in this round
String cityName = newlyAddedTable.split("_")[1];
List<String> expectedSnapshotDataThisRound =
Arrays.asList(
format(
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
newlyAddedTable, cityName, cityName));
if (makeRedoLogBeforeCapture) {
expectedSnapshotDataThisRound =
Arrays.asList(
format(
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614381, China, %s, %s West Town address 5]",
newlyAddedTable, cityName, cityName));
}
// trigger failover after some snapshot data read finished
if (failoverPhase == FailoverPhase.SNAPSHOT) {
triggerFailover(
failoverType,
jobClient.getJobID(),
miniClusterResource.getMiniCluster(),
() -> sleepMs(100));
}
fetchedDataList.addAll(expectedSnapshotDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
// step 3: make some redo log data for this round
makeFirstPartRedoLogForAddressTable(newlyAddedTable);
if (failoverPhase == FailoverPhase.REDO_LOG) {
triggerFailover(
failoverType,
jobClient.getJobID(),
miniClusterResource.getMiniCluster(),
() -> sleepMs(100));
}
makeSecondPartRedoLogForAddressTable(newlyAddedTable);
// step 4: assert fetched redo log data in this round
// retract the old data with id 416874195632735147
fetchedDataList =
fetchedDataList.stream()
.filter(
r ->
!r.contains(
format(
"%s, 416874195632735147",
newlyAddedTable)))
.collect(Collectors.toList());
List<String> expectedRedoLogUpsertDataThisRound =
Arrays.asList(
// add the new data with id 416874195632735147
format(
"+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614380, China, %s, %s West Town address 4]",
newlyAddedTable, cityName, cityName));
// step 5: assert fetched redo log data in this round
fetchedDataList.addAll(expectedRedoLogUpsertDataThisRound);
waitForUpsertSinkSize("sink", fetchedDataList.size());
// the result size of sink may arrive fetchedDataList.size() with old data, wait one
// checkpoint to wait retract old record and send new record
Thread.sleep(1000);
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
// step 6: trigger savepoint
if (round != captureAddressTables.length - 1) {
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
}
jobClient.cancel().get();
}
}
private void initialAddressTables(Connection connection, String[] addressTables)
throws SQLException {
try {
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
for (String tableName : addressTables) {
// make initial data for given table
String tableId = ORACLE_SCHEMA + '.' + tableName;
String cityName = tableName.split("_")[1];
statement.execute(
"CREATE TABLE "
+ tableId
+ "("
+ " ID NUMBER(19) NOT NULL,"
+ " COUNTRY VARCHAR(255) NOT NULL,"
+ " CITY VARCHAR(255) NOT NULL,"
+ " DETAIL_ADDRESS VARCHAR(1024),"
+ " PRIMARY KEY(ID)"
+ ")");
statement.execute(
format(
"INSERT INTO %s "
+ "VALUES (416874195632735147, 'China', '%s', '%s West Town address 1')",
tableId, cityName, cityName));
statement.execute(
format(
"INSERT INTO %s "
+ "VALUES (416927583791428523, 'China', '%s', '%s West Town address 2')",
tableId, cityName, cityName));
statement.execute(
format(
"INSERT INTO %s "
+ "VALUES (417022095255614379, 'China', '%s', '%s West Town address 3')",
tableId, cityName, cityName));
}
connection.commit();
} finally {
connection.close();
}
}
private void makeFirstPartRedoLogForAddressTable(String tableName) throws Exception {
String tableId = ORACLE_SCHEMA + '.' + tableName;
executeSql(
format("UPDATE %s SET COUNTRY = 'CHINA' where ID = 416874195632735147", tableId));
}
private void makeSecondPartRedoLogForAddressTable(String tableName) throws Exception {
String tableId = ORACLE_SCHEMA + '.' + tableName;
String cityName = tableName.split("_")[1];
executeSql(
format(
"INSERT INTO %s VALUES(417022095255614380, 'China','%s','%s West Town address 4')",
tableId, cityName, cityName));
}
private void makeRedoLogBeforeCaptureForAddressTable(String tableName) throws Exception {
String tableId = ORACLE_SCHEMA + '.' + tableName;
String cityName = tableName.split("_")[1];
executeSql(
format(
"INSERT INTO %s VALUES(417022095255614381, 'China','%s','%s West Town address 5')",
tableId, cityName, cityName));
}
private void makeRedoLogForAddressTableInRound(String tableName, int round) throws Exception {
String tableId = ORACLE_SCHEMA + '.' + tableName;
String cityName = tableName.split("_")[1];
executeSql(
format(
"UPDATE %s SET COUNTRY = 'CHINA_%s' where id = 416874195632735147",
tableId, round));
executeSql(
format(
"INSERT INTO %s VALUES(%d, 'China','%s','%s West Town address 4')",
tableId, 417022095255614380L + round, cityName, cityName));
}
private void sleepMs(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory)
throws ExecutionException, InterruptedException {
int retryTimes = 0;
// retry 600 times, it takes 100 milliseconds per time, at most retry 1 minute
while (retryTimes < 600) {
try {
return jobClient.triggerSavepoint(savepointDirectory).get();
} catch (Exception e) {
Optional<CheckpointException> exception =
ExceptionUtils.findThrowable(e, CheckpointException.class);
if (exception.isPresent()
&& exception.get().getMessage().contains("Checkpoint triggering task")) {
Thread.sleep(100);
retryTimes++;
} else {
throw e;
}
}
}
return null;
}
private StreamExecutionEnvironment getStreamExecutionEnvironmentFromSavePoint(
String finishedSavePointPath, int parallelism) throws Exception {
Configuration configuration = new Configuration();
if (finishedSavePointPath != null) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
return env;
}
private String getCreateTableStatement(
Map<String, String> otherOptions, String... captureTableNames) {
return String.format(
"CREATE TABLE address ("
+ " table_name STRING METADATA VIRTUAL,"
+ " ID BIGINT NOT NULL,"
+ " COUNTRY STRING,"
+ " CITY STRING,"
+ " DETAIL_ADDRESS STRING,"
+ " primary key (CITY, ID) not enforced"
+ ") WITH ("
+ " 'connector' = 'oracle-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ " 'scan.newly-added-table.enabled' = 'true',"
+ " 'chunk-meta.group.size' = '2'"
+ " %s"
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
ORACLE_CONTAINER.getUsername(),
ORACLE_CONTAINER.getPassword(),
ORACLE_DATABASE,
ORACLE_SCHEMA,
getTableNameRegex(captureTableNames),
otherOptions.isEmpty()
? ""
: ","
+ otherOptions.entrySet().stream()
.map(
e ->
String.format(
"'%s'='%s'",
e.getKey(), e.getValue()))
.collect(Collectors.joining(",")));
}
private void executeSql(String sql) throws Exception {
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
}
}
}

@ -25,9 +25,9 @@ import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
import org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverPhase;
import org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverType;
import org.apache.flink.cdc.connectors.oracle.testutils.TestTable;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@ -58,6 +58,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
@ -707,54 +708,4 @@ public class OracleSourceITCase extends OracleSourceTestBase {
statement.execute(sql);
}
}
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
/** The type of failover. */
private enum FailoverType {
TM,
JM,
NONE
}
/** The phase of failover. */
private enum FailoverPhase {
SNAPSHOT,
REDO_LOG,
NEVER
}
private static void triggerFailover(
FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
switch (type) {
case TM:
restartTaskManager(miniCluster, afterFailAction);
break;
case JM:
triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
break;
case NONE:
break;
default:
throw new IllegalStateException("Unexpected value: " + type);
}
}
private static void triggerJobManagerFailover(
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
afterFailAction.run();
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}
private static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
miniCluster.terminateTaskManager(0).get();
afterFailAction.run();
miniCluster.startTaskManager();
}
}

@ -174,7 +174,7 @@ public class OracleSourceTestBase extends TestLogger {
}
// ------------------ utils -----------------------
private static List<TableId> listTables(Connection connection) {
protected static List<TableId> listTables(Connection connection) {
Set<TableId> tableIdSet = new HashSet<>();
String queryTablesSql =

@ -118,7 +118,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -154,7 +155,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -164,6 +166,7 @@ public class OracleTableSourceFactoryTest {
options.put("port", "1521");
options.put("hostname", MY_LOCALHOST);
options.put("debezium.snapshot.mode", "initial");
options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
@ -194,7 +197,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
true);
assertEquals(expectedSource, actualSource);
}
@ -220,6 +224,7 @@ public class OracleTableSourceFactoryTest {
options.put(SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(fetchSize));
options.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(), "true");
options.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.key(), "true");
options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(
JdbcSourceOptions.CONNECT_TIMEOUT.key(),
@ -260,6 +265,7 @@ public class OracleTableSourceFactoryTest {
distributionFactorLower,
null,
true,
true,
true);
assertEquals(expectedSource, actualSource);
}
@ -297,7 +303,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -334,7 +341,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -375,7 +383,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "table_name", "schema_name");

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.oracle.testutils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.commons.lang3.StringUtils;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkState;
/** Oracle test utilities. */
public class OracleTestUtils {
/** The type of failover. */
public enum FailoverType {
TM,
JM,
NONE
}
/** The phase of failover. */
public enum FailoverPhase {
SNAPSHOT,
REDO_LOG,
NEVER
}
public static void triggerFailover(
FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
switch (type) {
case TM:
restartTaskManager(miniCluster, afterFailAction);
break;
case JM:
triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
break;
case NONE:
break;
default:
throw new IllegalStateException("Unexpected value: " + type);
}
}
public static void triggerJobManagerFailover(
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
afterFailAction.run();
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}
public static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
miniCluster.terminateTaskManager(0).get();
afterFailAction.run();
miniCluster.startTaskManager();
}
public static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
public static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
public static void waitForUpsertSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (upsertSinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
public static int upsertSinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
public static String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {
return captureCustomerTables[0];
} else {
// pattern that matches multiple tables
return format("(%s)", StringUtils.join(captureCustomerTables, "|"));
}
}
}
Loading…
Cancel
Save