[FLINK-35600][pipeline-connector/mysql] Add timestamp for low and high watermark

This closes #3415
pull/3868/head
wudi 2 weeks ago committed by GitHub
parent afd4a74c0a
commit 2fa215e5c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -154,6 +154,11 @@ public class MySqlSnapshotSplitReadTask
hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit); hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit);
} }
final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
lowWatermark
.getOffset()
.put(
BinlogOffset.TIMESTAMP_KEY,
String.valueOf(clock.currentTime().getEpochSecond()));
LOG.info( LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}", "Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark, lowWatermark,
@ -187,6 +192,11 @@ public class MySqlSnapshotSplitReadTask
} else { } else {
// Get the current binlog offset as HW // Get the current binlog offset as HW
highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
highWatermark
.getOffset()
.put(
BinlogOffset.TIMESTAMP_KEY,
String.valueOf(clock.currentTime().getEpochSecond()));
} }
LOG.info( LOG.info(

@ -251,9 +251,14 @@ public class BinlogOffset implements Comparable<BinlogOffset>, Serializable {
} }
// The completed events are the same, so compare the row number ... // The completed events are the same, so compare the row number ...
if (this.getRestartSkipRows() != that.getRestartSkipRows()) {
return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows()); return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows());
} }
// The skip rows are the same, so compare the timestamp ...
return Long.compare(this.getTimestampSec(), that.getTimestampSec());
}
public boolean isAtOrBefore(BinlogOffset that) { public boolean isAtOrBefore(BinlogOffset that) {
return this.compareTo(that) <= 0; return this.compareTo(that) <= 0;
} }

@ -107,6 +107,11 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
private final UniqueDatabase inventoryDatabase8 = private final UniqueDatabase inventoryDatabase8 =
new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD); new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);
private static final MySqlContainer MYSQL_CONTAINER_NOGTID =
createMySqlContainer(MySqlVersion.V5_7, "docker/server/my.cnf");
private final UniqueDatabase customerDatabaseNoGtid =
new UniqueDatabase(MYSQL_CONTAINER_NOGTID, "customer", TEST_USER, TEST_PASSWORD);
private BinaryLogClient binaryLogClient; private BinaryLogClient binaryLogClient;
private MySqlConnection mySqlConnection; private MySqlConnection mySqlConnection;
@ -891,6 +896,69 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
assertThat(eventFilter.test(event)).isFalse(); assertThat(eventFilter.test(event)).isFalse();
} }
@Test
public void testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception {
Startables.deepStart(Stream.of(MYSQL_CONTAINER_NOGTID)).join();
// Preparations
customerDatabaseNoGtid.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(
MYSQL_CONTAINER_NOGTID, customerDatabaseNoGtid, new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
// step-1: split snapshot
List<MySqlSnapshotSplit> snapshotSplits =
getMySqlSplits(new String[] {"customers"}, sourceConfig, customerDatabaseNoGtid);
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
final SnapshotSplitReader snapshotSplitReader =
new SnapshotSplitReader(statefulTaskContext, 0);
// step-1: read snapshot splits firstly
List<SourceRecord> snapshotRecords = new ArrayList<>();
for (int i = 0; i < snapshotSplits.size(); i++) {
MySqlSplit sqlSplit = snapshotSplits.get(i);
List<SourceRecord> sourceRecords = pollRecordsFromReader(snapshotSplitReader, sqlSplit);
snapshotRecords.addAll(sourceRecords);
// mock binlog event after read chunk1
if (i == 0) {
mySqlConnection.execute(
"INSERT INTO "
+ customerDatabaseNoGtid.qualifiedTableName("customers")
+ " VALUES(999999, 'user_22','Shanghai','123567891234')");
}
}
snapshotSplitReader.close();
// step-2: create binlog split reader
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
getFinishedSplitsInfo(snapshotSplits, snapshotRecords);
BinlogOffset startingOffset = getStartingOffsetOfBinlogSplit(finishedSplitsInfo);
Map<TableId, TableChange> tableSchemas = new HashMap<>();
for (MySqlSplit mySqlSplit : snapshotSplits) {
tableSchemas.putAll(mySqlSplit.getTableSchemas());
}
MySqlSplit binlogSplit =
new MySqlBinlogSplit(
"binlog-split",
startingOffset,
BinlogOffset.ofNonStopping(),
finishedSplitsInfo,
tableSchemas,
finishedSplitsInfo.size());
// step-3: test read binlog split
BinlogSplitReader binlogReader = new BinlogSplitReader(statefulTaskContext, 0);
binlogReader.submitSplit(binlogSplit);
List<SourceRecord> sourceRecords =
pollRecordsFromReader(binlogReader, RecordUtils::isDataChangeRecord);
MYSQL_CONTAINER_NOGTID.stop();
assertTrue(sourceRecords.isEmpty());
}
private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) { private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) {
return createBinlogReader(sourceConfig, false); return createBinlogReader(sourceConfig, false);
} }
@ -945,6 +1013,29 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
return records; return records;
} }
private List<SourceRecord> pollRecordsFromReader(
SnapshotSplitReader reader, MySqlSplit sqlSplit) {
List<SourceRecord> records = new ArrayList<>();
if (reader.isFinished()) {
reader.submitSplit(sqlSplit);
}
Iterator<SourceRecords> res;
try {
while ((res = reader.pollSplitRecords()) != null) {
while (res.hasNext()) {
Iterator<SourceRecord> iterator = res.next().iterator();
while (iterator.hasNext()) {
SourceRecord sourceRecord = iterator.next();
records.add(sourceRecord);
}
}
}
} catch (InterruptedException e) {
throw new RuntimeException("Polling action was interrupted", e);
}
return records;
}
private List<String> readBinlogSplits( private List<String> readBinlogSplits(
DataType dataType, BinlogSplitReader reader, int expectedSize) { DataType dataType, BinlogSplitReader reader, int expectedSize) {
List<String> actual = new ArrayList<>(); List<String> actual = new ArrayList<>();
@ -1173,10 +1264,10 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
} }
private List<MySqlSnapshotSplit> getMySqlSplits( private List<MySqlSnapshotSplit> getMySqlSplits(
String[] captureTables, MySqlSourceConfig sourceConfig) { String[] captureTables, MySqlSourceConfig sourceConfig, UniqueDatabase database) {
List<String> captureTableIds = List<String> captureTableIds =
Arrays.stream(captureTables) Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) .map(tableName -> database.getDatabaseName() + "." + tableName)
.collect(Collectors.toList()); .collect(Collectors.toList());
List<TableId> remainingTables = List<TableId> remainingTables =
captureTableIds.stream().map(TableId::parse).collect(Collectors.toList()); captureTableIds.stream().map(TableId::parse).collect(Collectors.toList());
@ -1202,6 +1293,11 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
return mySqlSplits; return mySqlSplits;
} }
private List<MySqlSnapshotSplit> getMySqlSplits(
String[] captureTables, MySqlSourceConfig sourceConfig) {
return getMySqlSplits(captureTables, sourceConfig, customerDatabase);
}
private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] captureTables) { private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] captureTables) {
return getConfig(MYSQL_CONTAINER, customerDatabase, startupOptions, captureTables); return getConfig(MYSQL_CONTAINER, customerDatabase, startupOptions, captureTables);
} }

Loading…
Cancel
Save