[debezium] Fix DebeizumSourceFunction can not do savepoint after close (#2259)

pull/2787/head
Leonard Xu 2 years ago committed by Hang Ruan
parent 8221f51ca8
commit 6dba62ece2

@ -71,6 +71,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.debezium.internal.Handover.ClosedException.isGentlyClosedException;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
@ -304,9 +305,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (handover.hasError()) { if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source"); LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException( if (!isGentlyClosedException(handover.getError())) {
"Call snapshotState() on closed source, checkpoint failed.", throw new FlinkRuntimeException(
handover.getError()); "Call snapshotState() on failed source, checkpoint failed.",
handover.getError());
}
} else { } else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId()); snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState(); snapshotHistoryRecordsState();
@ -591,4 +594,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
public String getEngineInstanceName() { public String getEngineInstanceName() {
return engineInstanceName; return engineInstanceName;
} }
@VisibleForTesting
public Handover getHandover() {
return handover;
}
} }

@ -191,6 +191,8 @@ public class Handover implements Closeable {
if (error == null) { if (error == null) {
error = new ClosedException(); error = new ClosedException();
} else if (!(error instanceof ClosedException)) {
error = new ClosedException("Close handover with error.", error);
} }
lock.notifyAll(); lock.notifyAll();
} }
@ -205,5 +207,20 @@ public class Handover implements Closeable {
public static final class ClosedException extends Exception { public static final class ClosedException extends Exception {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final String GENTLY_CLOSED_MESSAGE = "Close handover gently.";
public ClosedException() {
super(GENTLY_CLOSED_MESSAGE);
}
public ClosedException(String message, Throwable cause) {
super(message, cause);
}
public static boolean isGentlyClosedException(Throwable cause) {
return cause instanceof ClosedException
&& GENTLY_CLOSED_MESSAGE.equals(((ClosedException) cause).getMessage());
}
} }
} }

@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.mysql;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.util.FlinkRuntimeException;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
@ -29,6 +30,8 @@ import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.connectors.utils.TestSourceContext; import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer; import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import com.ververica.cdc.debezium.internal.Handover;
import io.debezium.DebeziumException;
import io.debezium.document.Document; import io.debezium.document.Document;
import io.debezium.document.DocumentWriter; import io.debezium.document.DocumentWriter;
import io.debezium.relational.Column; import io.debezium.relational.Column;
@ -915,6 +918,143 @@ public class LegacyMySqlSourceTest extends LegacyMySqlTestBase {
} }
} }
@Test
public void testSnapshotOnClosedSource() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
{
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
// Step-1: start the source from empty state
final DebeziumSourceFunction<SourceRecord> source = createMySqlBinlogSource();
final TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until the source finishes the database snapshot
List<SourceRecord> records = drain(sourceContext, 9);
assertEquals(9, records.size());
// state is still empty
assertEquals(0, offsetState.list.size());
assertEquals(0, historyState.list.size());
statement.execute(
"INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)"); // 110
int received = drain(sourceContext, 1).size();
assertEquals(1, received);
// Step-2: trigger a checkpoint
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
assertTrue(historyState.list.size() > 0);
assertTrue(offsetState.list.size() > 0);
// Step-3: mock the engine stop with savepoint, trigger a
// checkpoint on closed source
final Handover handover = source.getHandover();
handover.close();
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-2
source.snapshotState(new StateSnapshotContextSynchronousImpl(102, 102));
}
assertTrue(historyState.list.size() > 0);
assertTrue(offsetState.list.size() > 0);
source.close();
runThread.sync();
}
}
}
@Test
public void testSnapshotOnFailedSource() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
{
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
// Step-1: start the source from empty state
final DebeziumSourceFunction<SourceRecord> source = createMySqlBinlogSource();
final TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until the source finishes the database snapshot
List<SourceRecord> records = drain(sourceContext, 9);
assertEquals(9, records.size());
// state is still empty
assertEquals(0, offsetState.list.size());
assertEquals(0, historyState.list.size());
statement.execute(
"INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)"); // 110
int received = drain(sourceContext, 1).size();
assertEquals(1, received);
// Step-2: trigger a checkpoint
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
assertTrue(historyState.list.size() > 0);
assertTrue(offsetState.list.size() > 0);
// Step-3: mock the engine stop due to underlying debezium exception, trigger a
// checkpoint on failed source
final Handover handover = source.getHandover();
handover.reportError(new DebeziumException("Mocked debezium exception"));
handover.close();
try {
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-2
source.snapshotState(new StateSnapshotContextSynchronousImpl(102, 102));
}
fail("Should fail.");
} catch (Exception e) {
assertTrue(e instanceof FlinkRuntimeException);
assertTrue(
e.getMessage()
.contains(
"Call snapshotState() on failed source, checkpoint failed."));
assertTrue(e.getCause() instanceof Handover.ClosedException);
assertTrue(e.getCause().getMessage().contains("Close handover with error."));
} finally {
source.close();
runThread.sync();
}
}
}
}
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
// Public Utilities // Public Utilities
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------

Loading…
Cancel
Save