From e2d4ff735c7a94fa5073622260a78e5c0229ee97 Mon Sep 17 00:00:00 2001 From: Zhenghua Gao Date: Fri, 18 Dec 2020 11:06:38 +0800 Subject: [PATCH] [ISSUE-69] DebeziumSourceFunction only stores monitored table DDLs which cause error in some situation This closes #70 --- .../cdc/debezium/DebeziumSourceFunction.java | 2 - .../cdc/connectors/mysql/MySQLSourceTest.java | 99 +++++++++++++++++-- 2 files changed, 93 insertions(+), 8 deletions(-) diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java index 040e6a892..022f81cf4 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java @@ -266,8 +266,6 @@ public class DebeziumSourceFunction extends RichSourceFunction implements // see https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/ properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName()); - // reduce the history records to store - properties.setProperty("database.history.store.only.monitored.tables.ddl", "true"); if (engineInstanceName == null) { // not restore from recovery engineInstanceName = UUID.randomUUID().toString(); diff --git a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSourceTest.java b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSourceTest.java index 6d2a50f6e..431bd3868 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSourceTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSourceTest.java @@ -324,14 +324,101 @@ public class MySQLSourceTest extends MySQLTestBase { } } + @Test + public void testRecoverFromRenameOperation() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + + { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // Step-1: start the source from empty state + final DebeziumSourceFunction source = createMySqlBinlogSource(); + final TestSourceContext sourceContext = new TestSourceContext<>(); + // setup source with empty state + setupSource(source, false, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until the source finishes the database snapshot + List records = drain(sourceContext, 9); + assertEquals(9, records.size()); + + // state is still empty + assertEquals(0, offsetState.list.size()); + assertEquals(0, historyState.list.size()); + + // create temporary tables which are not in the whitelist + statement.execute("CREATE TABLE `tp_001_ogt_products` LIKE `products`;"); + // do some renames + statement.execute("RENAME TABLE `products` TO `tp_001_del_products`, `tp_001_ogt_products` TO `products`;"); + + statement.execute("INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)"); // 110 + statement.execute("INSERT INTO `products` VALUES (111,'stream train','Town stream train',1.304)"); // 111 + statement.execute("INSERT INTO `products` VALUES (112,'cargo train','City cargo train',1.304)"); // 112 + + int received = drain(sourceContext, 3).size(); + assertEquals(3, received); + + // Step-2: trigger a checkpoint + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + + assertTrue(historyState.list.size() > 0); + assertTrue(offsetState.list.size() > 0); + + source.cancel(); + source.close(); + runThread.sync(); + } + } + + { + // Step-3: restore the source from state + final DebeziumSourceFunction source2 = createMySqlBinlogSource(); + final TestSourceContext sourceContext2 = new TestSourceContext<>(); + setupSource(source2, true, offsetState, historyState, true, 0, 1); + final CheckedThread runThread2 = new CheckedThread() { + @Override + public void go() throws Exception { + source2.run(sourceContext2); + } + }; + runThread2.start(); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2)); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("INSERT INTO `products` VALUES (113,'Airplane','Toy airplane',1.304)"); // 113 + List records = drain(sourceContext2, 1); + assertEquals(1, records.size()); + assertInsert(records.get(0), "id", 113); + + source2.cancel(); + source2.close(); + runThread2.sync(); + } + } + } + private void assertHistoryState(TestingListState historyState) { // assert the DDL is stored in the history state - assertEquals(4, historyState.list.size()); - String lastHistory = historyState.list.get(3); - assertEquals("mysql_binlog_source", JsonPath.read(lastHistory, "$.source.server")); - assertEquals("true", JsonPath.read(lastHistory, "$.position.snapshot").toString()); - assertTrue(JsonPath.read(lastHistory, "$.databaseName").toString().startsWith("inventory")); - assertTrue(JsonPath.read(lastHistory, "$.ddl").toString().startsWith("CREATE TABLE `products`")); + assertTrue(historyState.list.size() > 0); + boolean hasDDL = historyState.list.stream().skip(1).anyMatch(history -> + JsonPath.read(history, "$.source.server").equals("mysql_binlog_source") + && JsonPath.read(history, "$.position.snapshot").toString().equals("true") + && JsonPath.read(history, "$.ddl").toString().startsWith("CREATE TABLE `products`")); + assertTrue(hasDDL); } // ------------------------------------------------------------------------------------------