From 007c61640668a3733dced7e44182f7c74c11957a Mon Sep 17 00:00:00 2001 From: Qingsheng Ren Date: Sat, 23 Apr 2022 09:06:46 +0800 Subject: [PATCH] [mysql] Catch and hold exception in chunk splitter thread and rethrow in getNext() (#1110) --- .../assigners/MySqlSnapshotSplitAssigner.java | 53 ++++++++++++------- .../mysql/source/MySqlSourceITCase.java | 17 ++++++ .../MySqlSnapshotSplitAssignerTest.java | 20 +++++++ .../src/test/resources/ddl/customer.sql | 30 +++++++++++ 4 files changed, 102 insertions(+), 18 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index a10bb551d..abf6e7979 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -75,13 +75,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private final int currentParallelism; private final List remainingTables; private final boolean isRemainingTablesCheckpointed; + private final Object lock = new Object(); + private volatile Throwable uncaughtSplitterException; private AssignerStatus assignerStatus; private ChunkSplitter chunkSplitter; private boolean isTableIdCaseSensitive; - private ExecutorService executor; - private Object lock; @Nullable private Long checkpointIdToFinish; @@ -145,7 +145,6 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { @Override public void open() { - lock = new Object(); chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive); // the legacy state didn't snapshot remaining tables, discovery remaining table here @@ -198,26 +197,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { this.executor = Executors.newSingleThreadExecutor(threadFactory); } - executor.submit( - () -> { - Iterator iterator = remainingTables.iterator(); - while (iterator.hasNext()) { - TableId nextTable = iterator.next(); - // split the given table into chunks (snapshot splits) - Collection splits = - chunkSplitter.generateSplits(nextTable); - synchronized (lock) { - remainingSplits.addAll(splits); - remainingTables.remove(nextTable); - lock.notify(); - } - } - }); + executor.submit(this::splitChunksForRemainingTables); } } @Override public Optional getNext() { + checkSplitterErrors(); synchronized (lock) { if (!remainingSplits.isEmpty()) { // return remaining splits firstly @@ -397,6 +383,37 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { return noMoreSplits() && assignedSplits.size() == splitFinishedOffsets.size(); } + private void splitChunksForRemainingTables() { + try { + for (TableId nextTable : remainingTables) { + // split the given table into chunks (snapshot splits) + Collection splits = chunkSplitter.generateSplits(nextTable); + synchronized (lock) { + remainingSplits.addAll(splits); + remainingTables.remove(nextTable); + lock.notify(); + } + } + } catch (Exception e) { + if (uncaughtSplitterException == null) { + uncaughtSplitterException = e; + } else { + uncaughtSplitterException.addSuppressed(e); + } + // Release the potential waiting getNext() call + synchronized (lock) { + lock.notify(); + } + } + } + + private void checkSplitterErrors() { + if (uncaughtSplitterException != null) { + throw new FlinkRuntimeException( + "Chunk splitting has encountered exception", uncaughtSplitterException); + } + } + private static ChunkSplitter createChunkSplitter( MySqlSourceConfig sourceConfig, boolean isTableIdCaseSensitive) { MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, isTableIdCaseSensitive); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java index d8195f89a..d3f47d1d9 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -59,6 +59,7 @@ import java.util.concurrent.ExecutionException; import static java.lang.String.format; import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; /** IT tests for {@link MySqlSource}. */ public class MySqlSourceITCase extends MySqlSourceTestBase { @@ -248,6 +249,22 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { "address_beijing"); } + @Test + public void testConsumingTableWithoutPrimaryKey() { + try { + testMySqlParallelSource( + 1, FailoverType.NONE, FailoverPhase.NEVER, new String[] {"customers_no_pk"}); + } catch (Exception e) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + e, + String.format( + "Incremental snapshot for tables requires primary key, but table %s doesn't have primary key", + customDatabase.getDatabaseName() + ".customers_no_pk")) + .isPresent()); + } + } + private void testMySqlParallelSource( FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables) throws Exception { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 02e58e00e..51e49a886 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -330,6 +330,26 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { } } + @Test + public void testTableWithoutPrimaryKey() { + String tableWithoutPrimaryKey = customerDatabase.getDatabaseName() + ".customers_no_pk"; + try { + getTestAssignSnapshotSplits( + 4, + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] {tableWithoutPrimaryKey}); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, + String.format( + "Incremental snapshot for tables requires primary key, but table %s doesn't have primary key", + tableWithoutPrimaryKey)) + .isPresent()); + } + } + private List getTestAssignSnapshotSplits( int splitSize, double distributionFactorUpper, diff --git a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql index 7d3fe8922..083abfe45 100644 --- a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql +++ b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql @@ -137,6 +137,36 @@ VALUES (1,'user_1','Shanghai','123567891234'), (3,'user_9','Shanghai','123567891234'), (3,'user_10','Shanghai','123567891234'); +CREATE TABLE customers_no_pk ( + id INTEGER NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers_no_pk +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); + -- table has combined primary key CREATE TABLE customer_card ( card_no BIGINT NOT NULL,