[mysql] Throw RuntimeException timely in snapshot scan phase (#1098)

pull/1109/head
Leonard Xu 3 years ago committed by GitHub
parent 893e934f74
commit 7eb9247122
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -232,6 +232,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
boolean reachBinlogEnd = false;
final List<SourceRecord> sourceRecords = new ArrayList<>();
while (!reachBinlogEnd) {
checkReadException();
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
sourceRecords.add(event.getRecord());

@ -20,6 +20,8 @@ package com.ververica.cdc.connectors.mysql.debezium.reader;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
@ -44,6 +46,9 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Tests for {@link SnapshotSplitReader}. */
public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
@ -171,6 +176,34 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@Test
public void testThrowRuntimeExceptionInSnapshotScan() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(new String[] {"customer_card", "customers_1"}, 10);
DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
DataTypes.FIELD("level", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("note", DataTypes.STRING()));
List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);
// DROP one table to mock snapshot scan error
String tableToDrop = String.format("%s.customers_1", customerDatabase.getDatabaseName());
mySqlConnection.execute("DROP TABLE IF EXISTS " + tableToDrop);
mySqlConnection.commit();
String exceptionMessage = String.format("Snapshotting of table %s failed.", tableToDrop);
try {
readTableSnapshotSplits(mySqlSplits, sourceConfig, mySqlSplits.size(), dataType);
fail("Should fail.");
} catch (Exception e) {
assertTrue(e instanceof FlinkRuntimeException);
assertTrue(ExceptionUtils.findThrowableWithMessage(e, exceptionMessage).isPresent());
}
}
private List<String> readTableSnapshotSplits(
List<MySqlSplit> mySqlSplits,
MySqlSourceConfig sourceConfig,

Loading…
Cancel
Save