[mysql] Catch and hold exception in chunk splitter thread and rethrow in getNext() (#1110)

pull/1118/head
Qingsheng Ren 3 years ago committed by GitHub
parent dbd87b9a01
commit 007c616406
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -75,13 +75,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private final int currentParallelism;
private final List<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;
private final Object lock = new Object();
private volatile Throwable uncaughtSplitterException;
private AssignerStatus assignerStatus;
private ChunkSplitter chunkSplitter;
private boolean isTableIdCaseSensitive;
private ExecutorService executor;
private Object lock;
@Nullable private Long checkpointIdToFinish;
@ -145,7 +145,6 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
@Override
public void open() {
lock = new Object();
chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive);
// the legacy state didn't snapshot remaining tables, discovery remaining table here
@ -198,26 +197,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
}
executor.submit(
() -> {
Iterator<TableId> iterator = remainingTables.iterator();
while (iterator.hasNext()) {
TableId nextTable = iterator.next();
// split the given table into chunks (snapshot splits)
Collection<MySqlSnapshotSplit> splits =
chunkSplitter.generateSplits(nextTable);
synchronized (lock) {
remainingSplits.addAll(splits);
remainingTables.remove(nextTable);
lock.notify();
}
}
});
executor.submit(this::splitChunksForRemainingTables);
}
}
@Override
public Optional<MySqlSplit> getNext() {
checkSplitterErrors();
synchronized (lock) {
if (!remainingSplits.isEmpty()) {
// return remaining splits firstly
@ -397,6 +383,37 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
return noMoreSplits() && assignedSplits.size() == splitFinishedOffsets.size();
}
private void splitChunksForRemainingTables() {
try {
for (TableId nextTable : remainingTables) {
// split the given table into chunks (snapshot splits)
Collection<MySqlSnapshotSplit> splits = chunkSplitter.generateSplits(nextTable);
synchronized (lock) {
remainingSplits.addAll(splits);
remainingTables.remove(nextTable);
lock.notify();
}
}
} catch (Exception e) {
if (uncaughtSplitterException == null) {
uncaughtSplitterException = e;
} else {
uncaughtSplitterException.addSuppressed(e);
}
// Release the potential waiting getNext() call
synchronized (lock) {
lock.notify();
}
}
}
private void checkSplitterErrors() {
if (uncaughtSplitterException != null) {
throw new FlinkRuntimeException(
"Chunk splitting has encountered exception", uncaughtSplitterException);
}
}
private static ChunkSplitter createChunkSplitter(
MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) {
MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, isTableIdCaseSensitive);

@ -59,6 +59,7 @@ import java.util.concurrent.ExecutionException;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertTrue;
/** IT tests for {@link MySqlSource}. */
public class MySqlSourceITCase extends MySqlSourceTestBase {
@ -248,6 +249,22 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
"address_beijing");
}
@Test
public void testConsumingTableWithoutPrimaryKey() {
try {
testMySqlParallelSource(
1, FailoverType.NONE, FailoverPhase.NEVER, new String[] {"customers_no_pk"});
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
e,
String.format(
"Incremental snapshot for tables requires primary key, but table %s doesn't have primary key",
customDatabase.getDatabaseName() + ".customers_no_pk"))
.isPresent());
}
}
private void testMySqlParallelSource(
FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables)
throws Exception {

@ -330,6 +330,26 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
}
}
@Test
public void testTableWithoutPrimaryKey() {
String tableWithoutPrimaryKey = customerDatabase.getDatabaseName() + ".customers_no_pk";
try {
getTestAssignSnapshotSplits(
4,
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {tableWithoutPrimaryKey});
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
String.format(
"Incremental snapshot for tables requires primary key, but table %s doesn't have primary key",
tableWithoutPrimaryKey))
.isPresent());
}
}
private List<String> getTestAssignSnapshotSplits(
int splitSize,
double distributionFactorUpper,

@ -137,6 +137,36 @@ VALUES (1,'user_1','Shanghai','123567891234'),
(3,'user_9','Shanghai','123567891234'),
(3,'user_10','Shanghai','123567891234');
CREATE TABLE customers_no_pk (
id INTEGER NOT NULL,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_no_pk
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234"),
(110,"user_5","Shanghai","123567891234"),
(111,"user_6","Shanghai","123567891234"),
(118,"user_7","Shanghai","123567891234"),
(121,"user_8","Shanghai","123567891234"),
(123,"user_9","Shanghai","123567891234"),
(1009,"user_10","Shanghai","123567891234"),
(1010,"user_11","Shanghai","123567891234"),
(1011,"user_12","Shanghai","123567891234"),
(1012,"user_13","Shanghai","123567891234"),
(1013,"user_14","Shanghai","123567891234"),
(1014,"user_15","Shanghai","123567891234"),
(1015,"user_16","Shanghai","123567891234"),
(1016,"user_17","Shanghai","123567891234"),
(1017,"user_18","Shanghai","123567891234"),
(1018,"user_19","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");
-- table has combined primary key
CREATE TABLE customer_card (
card_no BIGINT NOT NULL,

Loading…
Cancel
Save