diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index f729f59d0..492df8262 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -154,6 +154,11 @@ public class MySqlSnapshotSplitReadTask hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit); } final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); + lowWatermark + .getOffset() + .put( + BinlogOffset.TIMESTAMP_KEY, + String.valueOf(clock.currentTime().getEpochSecond())); LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, @@ -187,6 +192,11 @@ public class MySqlSnapshotSplitReadTask } else { // Get the current binlog offset as HW highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); + highWatermark + .getOffset() + .put( + BinlogOffset.TIMESTAMP_KEY, + String.valueOf(clock.currentTime().getEpochSecond())); } LOG.info( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java index af7948ac9..b2aa30bb3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java @@ -251,7 +251,12 @@ public class BinlogOffset implements Comparable, Serializable { } // The completed events are the same, so compare the row number ... - return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows()); + if (this.getRestartSkipRows() != that.getRestartSkipRows()) { + return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows()); + } + + // The skip rows are the same, so compare the timestamp ... + return Long.compare(this.getTimestampSec(), that.getTimestampSec()); } public boolean isAtOrBefore(BinlogOffset that) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 17ade8f0f..6a0df9069 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -107,6 +107,11 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase { private final UniqueDatabase inventoryDatabase8 = new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD); + private static final MySqlContainer MYSQL_CONTAINER_NOGTID = + createMySqlContainer(MySqlVersion.V5_7, "docker/server/my.cnf"); + private final UniqueDatabase customerDatabaseNoGtid = + new UniqueDatabase(MYSQL_CONTAINER_NOGTID, "customer", TEST_USER, TEST_PASSWORD); + private BinaryLogClient binaryLogClient; private MySqlConnection mySqlConnection; @@ -891,6 +896,69 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase { assertThat(eventFilter.test(event)).isFalse(); } + @Test + public void testBinlogOffsetCompareWithSnapshotAndBinlogPhase() throws Exception { + Startables.deepStart(Stream.of(MYSQL_CONTAINER_NOGTID)).join(); + // Preparations + customerDatabaseNoGtid.createAndInitialize(); + MySqlSourceConfig sourceConfig = + getConfig( + MYSQL_CONTAINER_NOGTID, customerDatabaseNoGtid, new String[] {"customers"}); + binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); + mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + + // step-1: split snapshot + List snapshotSplits = + getMySqlSplits(new String[] {"customers"}, sourceConfig, customerDatabaseNoGtid); + + final StatefulTaskContext statefulTaskContext = + new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection); + final SnapshotSplitReader snapshotSplitReader = + new SnapshotSplitReader(statefulTaskContext, 0); + + // step-1: read snapshot splits firstly + List snapshotRecords = new ArrayList<>(); + for (int i = 0; i < snapshotSplits.size(); i++) { + MySqlSplit sqlSplit = snapshotSplits.get(i); + List sourceRecords = pollRecordsFromReader(snapshotSplitReader, sqlSplit); + snapshotRecords.addAll(sourceRecords); + // mock binlog event after read chunk1 + if (i == 0) { + mySqlConnection.execute( + "INSERT INTO " + + customerDatabaseNoGtid.qualifiedTableName("customers") + + " VALUES(999999, 'user_22','Shanghai','123567891234')"); + } + } + snapshotSplitReader.close(); + + // step-2: create binlog split reader + List finishedSplitsInfo = + getFinishedSplitsInfo(snapshotSplits, snapshotRecords); + BinlogOffset startingOffset = getStartingOffsetOfBinlogSplit(finishedSplitsInfo); + Map tableSchemas = new HashMap<>(); + for (MySqlSplit mySqlSplit : snapshotSplits) { + tableSchemas.putAll(mySqlSplit.getTableSchemas()); + } + MySqlSplit binlogSplit = + new MySqlBinlogSplit( + "binlog-split", + startingOffset, + BinlogOffset.ofNonStopping(), + finishedSplitsInfo, + tableSchemas, + finishedSplitsInfo.size()); + + // step-3: test read binlog split + BinlogSplitReader binlogReader = new BinlogSplitReader(statefulTaskContext, 0); + binlogReader.submitSplit(binlogSplit); + + List sourceRecords = + pollRecordsFromReader(binlogReader, RecordUtils::isDataChangeRecord); + MYSQL_CONTAINER_NOGTID.stop(); + assertTrue(sourceRecords.isEmpty()); + } + private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) { return createBinlogReader(sourceConfig, false); } @@ -945,6 +1013,29 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase { return records; } + private List pollRecordsFromReader( + SnapshotSplitReader reader, MySqlSplit sqlSplit) { + List records = new ArrayList<>(); + if (reader.isFinished()) { + reader.submitSplit(sqlSplit); + } + Iterator res; + try { + while ((res = reader.pollSplitRecords()) != null) { + while (res.hasNext()) { + Iterator iterator = res.next().iterator(); + while (iterator.hasNext()) { + SourceRecord sourceRecord = iterator.next(); + records.add(sourceRecord); + } + } + } + } catch (InterruptedException e) { + throw new RuntimeException("Polling action was interrupted", e); + } + return records; + } + private List readBinlogSplits( DataType dataType, BinlogSplitReader reader, int expectedSize) { List actual = new ArrayList<>(); @@ -1173,10 +1264,10 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase { } private List getMySqlSplits( - String[] captureTables, MySqlSourceConfig sourceConfig) { + String[] captureTables, MySqlSourceConfig sourceConfig, UniqueDatabase database) { List captureTableIds = Arrays.stream(captureTables) - .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) + .map(tableName -> database.getDatabaseName() + "." + tableName) .collect(Collectors.toList()); List remainingTables = captureTableIds.stream().map(TableId::parse).collect(Collectors.toList()); @@ -1202,6 +1293,11 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase { return mySqlSplits; } + private List getMySqlSplits( + String[] captureTables, MySqlSourceConfig sourceConfig) { + return getMySqlSplits(captureTables, sourceConfig, customerDatabase); + } + private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] captureTables) { return getConfig(MYSQL_CONTAINER, customerDatabase, startupOptions, captureTables); }