[ISSUE-69] DebeziumSourceFunction only stores monitored table DDLs which cause error in some situation

This closes #70
release-1.1
Zhenghua Gao 4 years ago committed by Jark Wu
parent 3dc2f0d5cf
commit 5eda7e485d
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -266,8 +266,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
// see https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector // see https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
// and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/ // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName()); properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
// reduce the history records to store
properties.setProperty("database.history.store.only.monitored.tables.ddl", "true");
if (engineInstanceName == null) { if (engineInstanceName == null) {
// not restore from recovery // not restore from recovery
engineInstanceName = UUID.randomUUID().toString(); engineInstanceName = UUID.randomUUID().toString();

@ -324,14 +324,101 @@ public class MySQLSourceTest extends MySQLTestBase {
} }
} }
@Test
public void testRecoverFromRenameOperation() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
{
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
// Step-1: start the source from empty state
final DebeziumSourceFunction<SourceRecord> source = createMySqlBinlogSource();
final TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread = new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until the source finishes the database snapshot
List<SourceRecord> records = drain(sourceContext, 9);
assertEquals(9, records.size());
// state is still empty
assertEquals(0, offsetState.list.size());
assertEquals(0, historyState.list.size());
// create temporary tables which are not in the whitelist
statement.execute("CREATE TABLE `tp_001_ogt_products` LIKE `products`;");
// do some renames
statement.execute("RENAME TABLE `products` TO `tp_001_del_products`, `tp_001_ogt_products` TO `products`;");
statement.execute("INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)"); // 110
statement.execute("INSERT INTO `products` VALUES (111,'stream train','Town stream train',1.304)"); // 111
statement.execute("INSERT INTO `products` VALUES (112,'cargo train','City cargo train',1.304)"); // 112
int received = drain(sourceContext, 3).size();
assertEquals(3, received);
// Step-2: trigger a checkpoint
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
assertTrue(historyState.list.size() > 0);
assertTrue(offsetState.list.size() > 0);
source.cancel();
source.close();
runThread.sync();
}
}
{
// Step-3: restore the source from state
final DebeziumSourceFunction<SourceRecord> source2 = createMySqlBinlogSource();
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
setupSource(source2, true, offsetState, historyState, true, 0, 1);
final CheckedThread runThread2 = new CheckedThread() {
@Override
public void go() throws Exception {
source2.run(sourceContext2);
}
};
runThread2.start();
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2));
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("INSERT INTO `products` VALUES (113,'Airplane','Toy airplane',1.304)"); // 113
List<SourceRecord> records = drain(sourceContext2, 1);
assertEquals(1, records.size());
assertInsert(records.get(0), "id", 113);
source2.cancel();
source2.close();
runThread2.sync();
}
}
}
private void assertHistoryState(TestingListState<String> historyState) { private void assertHistoryState(TestingListState<String> historyState) {
// assert the DDL is stored in the history state // assert the DDL is stored in the history state
assertEquals(4, historyState.list.size()); assertTrue(historyState.list.size() > 0);
String lastHistory = historyState.list.get(3); boolean hasDDL = historyState.list.stream().skip(1).anyMatch(history ->
assertEquals("mysql_binlog_source", JsonPath.read(lastHistory, "$.source.server")); JsonPath.read(history, "$.source.server").equals("mysql_binlog_source")
assertEquals("true", JsonPath.read(lastHistory, "$.position.snapshot").toString()); && JsonPath.read(history, "$.position.snapshot").toString().equals("true")
assertTrue(JsonPath.read(lastHistory, "$.databaseName").toString().startsWith("inventory")); && JsonPath.read(history, "$.ddl").toString().startsWith("CREATE TABLE `products`"));
assertTrue(JsonPath.read(lastHistory, "$.ddl").toString().startsWith("CREATE TABLE `products`")); assertTrue(hasDDL);
} }
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------

Loading…
Cancel
Save