diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 7f7ec2bba..1f5b73c42 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -22,9 +22,12 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventType; import com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask; import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind; import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; @@ -54,6 +57,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey; @@ -111,7 +115,8 @@ public class BinlogSplitReader implements DebeziumReader { @@ -299,6 +304,21 @@ public class BinlogSplitReader implements DebeziumReader createEventFilter(BinlogOffset startingOffset) { + // If the startup mode is set as TIMESTAMP, we need to apply a filter on event to drop + // events earlier than the specified timestamp. + if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) { + long startTimestampSec = startingOffset.getTimestampSec(); + // Notes: + // 1. Heartbeat event doesn't contain timestamp, so we just keep it + // 2. Timestamp of event is in epoch millisecond + return event -> + EventType.HEARTBEAT.equals(event.getHeader().getEventType()) + || event.getHeader().getTimestamp() >= startTimestampSec * 1000; + } + return event -> true; + } + public void stopBinlogReadTask() { this.currentTaskRunning = false; } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index 3f91a6043..6dd266174 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -212,7 +212,8 @@ public class SnapshotSplitReader implements DebeziumReader true); } private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java index 88c46e5b1..e7a81783f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java @@ -35,6 +35,8 @@ import io.debezium.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.Predicate; + import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.isNonStoppingOffset; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition; @@ -49,6 +51,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource { private final EventDispatcherImpl eventDispatcher; private final SignalEventDispatcher signalEventDispatcher; private final ErrorHandler errorHandler; + private final Predicate eventFilter; private ChangeEventSourceContext context; public MySqlBinlogSplitReadTask( @@ -60,12 +63,14 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource { Clock clock, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics, - MySqlBinlogSplit binlogSplit) { + MySqlBinlogSplit binlogSplit, + Predicate eventFilter) { super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics); this.binlogSplit = binlogSplit; this.eventDispatcher = dispatcher; this.errorHandler = errorHandler; this.signalEventDispatcher = signalEventDispatcher; + this.eventFilter = eventFilter; } @Override @@ -77,6 +82,9 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource { @Override protected void handleEvent(MySqlOffsetContext offsetContext, Event event) { + if (!eventFilter.test(event)) { + return; + } super.handleEvent(offsetContext, event); // check do we need to stop for read binlog for snapshot split. if (isBoundedRead()) { diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/ChangeEventCreatorFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/ChangeEventCreatorFactory.java deleted file mode 100644 index da7f13be7..000000000 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/ChangeEventCreatorFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2022 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.mysql.debezium.task.context; - -import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind; -import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; -import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils; -import io.debezium.pipeline.DataChangeEvent; -import io.debezium.pipeline.spi.ChangeEventCreator; - -/** Factory class for creating {@link ChangeEventCreator}. */ -public class ChangeEventCreatorFactory { - - /** - * Create a {@link ChangeEventCreator} according to the assigned split. - * - *

If a binlog split is assigned, and the starting binlog offset has {@link - * BinlogOffsetKind#TIMESTAMP} kind, we need to apply a filter onto the creator, which drops - * events earlier than the specified timestamp. - * - * @param split the assigned split - */ - public static ChangeEventCreator createChangeEventCreator(MySqlSplit split) { - if (split.isBinlogSplit() - && split.asBinlogSplit() - .getStartingOffset() - .getOffsetKind() - .equals(BinlogOffsetKind.TIMESTAMP)) { - long startTimestampSec = split.asBinlogSplit().getStartingOffset().getTimestampSec(); - return sourceRecord -> { - Long messageTimestampMs = RecordUtils.getMessageTimestamp(sourceRecord); - if (messageTimestampMs != null && messageTimestampMs / 1000 >= startTimestampSec) { - return new DataChangeEvent(sourceRecord); - } else { - return null; - } - }; - } - - return DataChangeEvent::new; - } -} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index 715b76cc8..fd1ade927 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -56,7 +56,6 @@ import java.time.Instant; import java.util.List; import java.util.Map; -import static com.ververica.cdc.connectors.mysql.debezium.task.context.ChangeEventCreatorFactory.createChangeEventCreator; import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY; import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset; @@ -145,7 +144,7 @@ public class StatefulTaskContext { databaseSchema, queue, connectorConfig.getTableFilters().dataCollectionFilter(), - createChangeEventCreator(mySqlSplit), + DataChangeEvent::new, metadataProvider, schemaNameAdjuster); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java index 9eeec81d9..3c2444729 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java @@ -17,7 +17,6 @@ package com.ververica.cdc.connectors.mysql.source.offset; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; -import com.ververica.cdc.connectors.mysql.debezium.task.context.ChangeEventCreatorFactory; import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; import io.debezium.connector.mysql.MySqlConnection; @@ -39,8 +38,7 @@ public class BinlogOffsetUtils { *

    *
  • EARLIEST: binlog filename = "", position = 0 *
  • TIMESTAMP: set to earliest, as the current implementation is reading from the earliest - * offset and drop events earlier than the specified timestamp. See {@link - * ChangeEventCreatorFactory#createChangeEventCreator}. + * offset and drop events earlier than the specified timestamp. *
  • LATEST: fetch the current binlog by JDBC *
*/ diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 570b39167..e1f776bf9 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -572,6 +572,64 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase { assertEqualsInOrder(Arrays.asList(expected), actual); } + @Test + public void testReadBinlogFromTimestampAfterSchemaChange() throws Exception { + // Preparations + customerDatabase.createAndInitialize(); + MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"}); + binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); + mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING()), + DataTypes.FIELD("new_int_column", DataTypes.INT())); + String tableId = customerDatabase.qualifiedTableName("customers"); + + // Add a new column to the table + addColumnToTable(mySqlConnection, tableId); + + // Unfortunately we need this sleep here to make sure we could differ the coming binlog + // events from existing events by timestamp. + Thread.sleep(2000); + + // Capture current timestamp now + long startTimestamp = System.currentTimeMillis(); + + // Create a new config to start reading from the offset captured above + MySqlSourceConfig sourceConfig = + getConfig(StartupOptions.timestamp(startTimestamp), new String[] {"customers"}); + + // Create reader and submit splits + MySqlBinlogSplit split = createBinlogSplit(sourceConfig); + BinlogSplitReader reader = createBinlogReader(sourceConfig); + reader.submitSplit(split); + + // Create some binlog events + mySqlConnection.execute( + "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", + "DELETE FROM " + tableId + " where id = 102", + "INSERT INTO " + + tableId + + " VALUES(102, 'user_2','Shanghai','123567891234', 15213)", + "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"); + + // Read with binlog split reader and validate + String[] expected = + new String[] { + "-U[103, user_3, Shanghai, 123567891234, 15213]", + "+U[103, user_3, Hangzhou, 123567891234, 15213]", + "-D[102, user_2, Shanghai, 123567891234, 15213]", + "+I[102, user_2, Shanghai, 123567891234, 15213]", + "-U[103, user_3, Hangzhou, 123567891234, 15213]", + "+U[103, user_3, Shanghai, 123567891234, 15213]", + }; + List actual = readBinlogSplits(dataType, reader, expected.length); + assertEqualsInOrder(Arrays.asList(expected), actual); + } + @Test public void testHeartbeatEvent() throws Exception { // Initialize database @@ -940,4 +998,9 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase { .fetchSize(2) .password(customerDatabase.getPassword()); } + + private void addColumnToTable(JdbcConnection connection, String tableId) throws Exception { + connection.execute( + "ALTER TABLE " + tableId + " ADD COLUMN new_int_column INT DEFAULT 15213"); + } }