diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index ddbbbea7b..f4c4a308f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -232,6 +232,7 @@ public class SnapshotSplitReader implements DebeziumReader sourceRecords = new ArrayList<>(); while (!reachBinlogEnd) { + checkReadException(); List batch = queue.poll(); for (DataChangeEvent event : batch) { sourceRecords.add(event.getRecord()); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 1e2066d59..9a81b310d 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -20,6 +20,8 @@ package com.ververica.cdc.connectors.mysql.debezium.reader; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; @@ -44,6 +46,9 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** Tests for {@link SnapshotSplitReader}. */ public class SnapshotSplitReaderTest extends MySqlSourceTestBase { @@ -171,6 +176,34 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase { assertEqualsInAnyOrder(Arrays.asList(expected), actual); } + @Test + public void testThrowRuntimeExceptionInSnapshotScan() throws Exception { + MySqlSourceConfig sourceConfig = + getConfig(new String[] {"customer_card", "customers_1"}, 10); + + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("card_no", DataTypes.BIGINT()), + DataTypes.FIELD("level", DataTypes.STRING()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("note", DataTypes.STRING())); + List mySqlSplits = getMySqlSplits(sourceConfig); + + // DROP one table to mock snapshot scan error + String tableToDrop = String.format("%s.customers_1", customerDatabase.getDatabaseName()); + mySqlConnection.execute("DROP TABLE IF EXISTS " + tableToDrop); + mySqlConnection.commit(); + + String exceptionMessage = String.format("Snapshotting of table %s failed.", tableToDrop); + try { + readTableSnapshotSplits(mySqlSplits, sourceConfig, mySqlSplits.size(), dataType); + fail("Should fail."); + } catch (Exception e) { + assertTrue(e instanceof FlinkRuntimeException); + assertTrue(ExceptionUtils.findThrowableWithMessage(e, exceptionMessage).isPresent()); + } + } + private List readTableSnapshotSplits( List mySqlSplits, MySqlSourceConfig sourceConfig,