From 6dba62ece2c068daf07e8fc91472f6a820da55e2 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Fri, 21 Jul 2023 09:57:48 +0800 Subject: [PATCH] [debezium] Fix DebeizumSourceFunction can not do savepoint after close (#2259) --- .../cdc/debezium/DebeziumSourceFunction.java | 14 +- .../cdc/debezium/internal/Handover.java | 17 +++ .../mysql/LegacyMySqlSourceTest.java | 140 ++++++++++++++++++ 3 files changed, 168 insertions(+), 3 deletions(-) diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java index a473391d2..76ac9dcf5 100644 --- a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java @@ -71,6 +71,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static com.ververica.cdc.debezium.internal.Handover.ClosedException.isGentlyClosedException; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; @@ -304,9 +305,11 @@ public class DebeziumSourceFunction extends RichSourceFunction public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { if (handover.hasError()) { LOG.debug("snapshotState() called on closed source"); - throw new FlinkRuntimeException( - "Call snapshotState() on closed source, checkpoint failed.", - handover.getError()); + if (!isGentlyClosedException(handover.getError())) { + throw new FlinkRuntimeException( + "Call snapshotState() on failed source, checkpoint failed.", + handover.getError()); + } } else { snapshotOffsetState(functionSnapshotContext.getCheckpointId()); snapshotHistoryRecordsState(); @@ -591,4 +594,9 @@ public class DebeziumSourceFunction extends RichSourceFunction public String getEngineInstanceName() { return engineInstanceName; } + + @VisibleForTesting + public Handover getHandover() { + return handover; + } } diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/internal/Handover.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/internal/Handover.java index 11d2e6ae8..f7e33b52c 100644 --- a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/internal/Handover.java +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/internal/Handover.java @@ -191,6 +191,8 @@ public class Handover implements Closeable { if (error == null) { error = new ClosedException(); + } else if (!(error instanceof ClosedException)) { + error = new ClosedException("Close handover with error.", error); } lock.notifyAll(); } @@ -205,5 +207,20 @@ public class Handover implements Closeable { public static final class ClosedException extends Exception { private static final long serialVersionUID = 1L; + + private static final String GENTLY_CLOSED_MESSAGE = "Close handover gently."; + + public ClosedException() { + super(GENTLY_CLOSED_MESSAGE); + } + + public ClosedException(String message, Throwable cause) { + super(message, cause); + } + + public static boolean isGentlyClosedException(Throwable cause) { + return cause instanceof ClosedException + && GENTLY_CLOSED_MESSAGE.equals(((ClosedException) cause).getMessage()); + } } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/LegacyMySqlSourceTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/LegacyMySqlSourceTest.java index e257628e8..a511a81ba 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/LegacyMySqlSourceTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/LegacyMySqlSourceTest.java @@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.mysql; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.util.FlinkRuntimeException; import com.fasterxml.jackson.core.JsonParseException; import com.jayway.jsonpath.JsonPath; @@ -29,6 +30,8 @@ import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import com.ververica.cdc.connectors.utils.TestSourceContext; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer; +import com.ververica.cdc.debezium.internal.Handover; +import io.debezium.DebeziumException; import io.debezium.document.Document; import io.debezium.document.DocumentWriter; import io.debezium.relational.Column; @@ -915,6 +918,143 @@ public class LegacyMySqlSourceTest extends LegacyMySqlTestBase { } } + @Test + public void testSnapshotOnClosedSource() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + + { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // Step-1: start the source from empty state + final DebeziumSourceFunction source = createMySqlBinlogSource(); + final TestSourceContext sourceContext = new TestSourceContext<>(); + // setup source with empty state + setupSource(source, false, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until the source finishes the database snapshot + List records = drain(sourceContext, 9); + assertEquals(9, records.size()); + + // state is still empty + assertEquals(0, offsetState.list.size()); + assertEquals(0, historyState.list.size()); + + statement.execute( + "INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)"); // 110 + + int received = drain(sourceContext, 1).size(); + assertEquals(1, received); + + // Step-2: trigger a checkpoint + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + + assertTrue(historyState.list.size() > 0); + assertTrue(offsetState.list.size() > 0); + + // Step-3: mock the engine stop with savepoint, trigger a + // checkpoint on closed source + final Handover handover = source.getHandover(); + handover.close(); + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-2 + source.snapshotState(new StateSnapshotContextSynchronousImpl(102, 102)); + } + + assertTrue(historyState.list.size() > 0); + assertTrue(offsetState.list.size() > 0); + + source.close(); + runThread.sync(); + } + } + } + + @Test + public void testSnapshotOnFailedSource() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + + { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // Step-1: start the source from empty state + final DebeziumSourceFunction source = createMySqlBinlogSource(); + final TestSourceContext sourceContext = new TestSourceContext<>(); + // setup source with empty state + setupSource(source, false, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until the source finishes the database snapshot + List records = drain(sourceContext, 9); + assertEquals(9, records.size()); + + // state is still empty + assertEquals(0, offsetState.list.size()); + assertEquals(0, historyState.list.size()); + + statement.execute( + "INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)"); // 110 + + int received = drain(sourceContext, 1).size(); + assertEquals(1, received); + + // Step-2: trigger a checkpoint + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + + assertTrue(historyState.list.size() > 0); + assertTrue(offsetState.list.size() > 0); + + // Step-3: mock the engine stop due to underlying debezium exception, trigger a + // checkpoint on failed source + final Handover handover = source.getHandover(); + handover.reportError(new DebeziumException("Mocked debezium exception")); + handover.close(); + try { + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-2 + source.snapshotState(new StateSnapshotContextSynchronousImpl(102, 102)); + } + fail("Should fail."); + } catch (Exception e) { + assertTrue(e instanceof FlinkRuntimeException); + assertTrue( + e.getMessage() + .contains( + "Call snapshotState() on failed source, checkpoint failed.")); + assertTrue(e.getCause() instanceof Handover.ClosedException); + assertTrue(e.getCause().getMessage().contains("Close handover with error.")); + } finally { + source.close(); + runThread.sync(); + } + } + } + } + // ------------------------------------------------------------------------------------------ // Public Utilities // ------------------------------------------------------------------------------------------