[mysql] Use event filter to drop events earlier than the specified timestamp in timestamp startup mode (#1724)

pull/1741/head
Qingsheng Ren 2 years ago committed by Leonard Xu
parent f1499f593a
commit f1ea59baeb

@ -22,9 +22,12 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
@ -54,6 +57,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey;
@ -111,7 +115,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
statefulTaskContext.getTaskContext(),
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
currentBinlogSplit);
currentBinlogSplit,
createEventFilter(currentBinlogSplit.getStartingOffset()));
executorService.submit(
() -> {
@ -299,6 +304,21 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
this.pureBinlogPhaseTables.clear();
}
private Predicate<Event> createEventFilter(BinlogOffset startingOffset) {
// If the startup mode is set as TIMESTAMP, we need to apply a filter on event to drop
// events earlier than the specified timestamp.
if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
long startTimestampSec = startingOffset.getTimestampSec();
// Notes:
// 1. Heartbeat event doesn't contain timestamp, so we just keep it
// 2. Timestamp of event is in epoch millisecond
return event ->
EventType.HEARTBEAT.equals(event.getHeader().getEventType())
|| event.getHeader().getTimestamp() >= startTimestampSec * 1000;
}
return event -> true;
}
public void stopBinlogReadTask() {
this.currentTaskRunning = false;
}

@ -212,7 +212,8 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
statefulTaskContext.getTaskContext(),
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
backfillBinlogSplit);
backfillBinlogSplit,
event -> true);
}
private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit)

@ -35,6 +35,8 @@ import io.debezium.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Predicate;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.isNonStoppingOffset;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
@ -49,6 +51,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
private final EventDispatcherImpl<TableId> eventDispatcher;
private final SignalEventDispatcher signalEventDispatcher;
private final ErrorHandler errorHandler;
private final Predicate<Event> eventFilter;
private ChangeEventSourceContext context;
public MySqlBinlogSplitReadTask(
@ -60,12 +63,14 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
Clock clock,
MySqlTaskContext taskContext,
MySqlStreamingChangeEventSourceMetrics metrics,
MySqlBinlogSplit binlogSplit) {
MySqlBinlogSplit binlogSplit,
Predicate<Event> eventFilter) {
super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics);
this.binlogSplit = binlogSplit;
this.eventDispatcher = dispatcher;
this.errorHandler = errorHandler;
this.signalEventDispatcher = signalEventDispatcher;
this.eventFilter = eventFilter;
}
@Override
@ -77,6 +82,9 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
@Override
protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
if (!eventFilter.test(event)) {
return;
}
super.handleEvent(offsetContext, event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {

@ -1,56 +0,0 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.debezium.task.context;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.spi.ChangeEventCreator;
/** Factory class for creating {@link ChangeEventCreator}. */
public class ChangeEventCreatorFactory {
/**
* Create a {@link ChangeEventCreator} according to the assigned split.
*
* <p>If a binlog split is assigned, and the starting binlog offset has {@link
* BinlogOffsetKind#TIMESTAMP} kind, we need to apply a filter onto the creator, which drops
* events earlier than the specified timestamp.
*
* @param split the assigned split
*/
public static ChangeEventCreator createChangeEventCreator(MySqlSplit split) {
if (split.isBinlogSplit()
&& split.asBinlogSplit()
.getStartingOffset()
.getOffsetKind()
.equals(BinlogOffsetKind.TIMESTAMP)) {
long startTimestampSec = split.asBinlogSplit().getStartingOffset().getTimestampSec();
return sourceRecord -> {
Long messageTimestampMs = RecordUtils.getMessageTimestamp(sourceRecord);
if (messageTimestampMs != null && messageTimestampMs / 1000 >= startTimestampSec) {
return new DataChangeEvent(sourceRecord);
} else {
return null;
}
};
}
return DataChangeEvent::new;
}
}

@ -56,7 +56,6 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import static com.ververica.cdc.connectors.mysql.debezium.task.context.ChangeEventCreatorFactory.createChangeEventCreator;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
@ -145,7 +144,7 @@ public class StatefulTaskContext {
databaseSchema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
createChangeEventCreator(mySqlSplit),
DataChangeEvent::new,
metadataProvider,
schemaNameAdjuster);

@ -17,7 +17,6 @@
package com.ververica.cdc.connectors.mysql.source.offset;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.debezium.task.context.ChangeEventCreatorFactory;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import io.debezium.connector.mysql.MySqlConnection;
@ -39,8 +38,7 @@ public class BinlogOffsetUtils {
* <ul>
* <li>EARLIEST: binlog filename = "", position = 0
* <li>TIMESTAMP: set to earliest, as the current implementation is reading from the earliest
* offset and drop events earlier than the specified timestamp. See {@link
* ChangeEventCreatorFactory#createChangeEventCreator}.
* offset and drop events earlier than the specified timestamp.
* <li>LATEST: fetch the current binlog by JDBC
* </ul>
*/

@ -572,6 +572,64 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
assertEqualsInOrder(Arrays.asList(expected), actual);
}
@Test
public void testReadBinlogFromTimestampAfterSchemaChange() throws Exception {
// Preparations
customerDatabase.createAndInitialize();
MySqlSourceConfig connectionConfig = getConfig(new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig);
DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()),
DataTypes.FIELD("new_int_column", DataTypes.INT()));
String tableId = customerDatabase.qualifiedTableName("customers");
// Add a new column to the table
addColumnToTable(mySqlConnection, tableId);
// Unfortunately we need this sleep here to make sure we could differ the coming binlog
// events from existing events by timestamp.
Thread.sleep(2000);
// Capture current timestamp now
long startTimestamp = System.currentTimeMillis();
// Create a new config to start reading from the offset captured above
MySqlSourceConfig sourceConfig =
getConfig(StartupOptions.timestamp(startTimestamp), new String[] {"customers"});
// Create reader and submit splits
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
BinlogSplitReader reader = createBinlogReader(sourceConfig);
reader.submitSplit(split);
// Create some binlog events
mySqlConnection.execute(
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
"INSERT INTO "
+ tableId
+ " VALUES(102, 'user_2','Shanghai','123567891234', 15213)",
"UPDATE " + tableId + " SET address = 'Shanghai' where id = 103");
// Read with binlog split reader and validate
String[] expected =
new String[] {
"-U[103, user_3, Shanghai, 123567891234, 15213]",
"+U[103, user_3, Hangzhou, 123567891234, 15213]",
"-D[102, user_2, Shanghai, 123567891234, 15213]",
"+I[102, user_2, Shanghai, 123567891234, 15213]",
"-U[103, user_3, Hangzhou, 123567891234, 15213]",
"+U[103, user_3, Shanghai, 123567891234, 15213]",
};
List<String> actual = readBinlogSplits(dataType, reader, expected.length);
assertEqualsInOrder(Arrays.asList(expected), actual);
}
@Test
public void testHeartbeatEvent() throws Exception {
// Initialize database
@ -940,4 +998,9 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
.fetchSize(2)
.password(customerDatabase.getPassword());
}
private void addColumnToTable(JdbcConnection connection, String tableId) throws Exception {
connection.execute(
"ALTER TABLE " + tableId + " ADD COLUMN new_int_column INT DEFAULT 15213");
}
}

Loading…
Cancel
Save