[FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file (#3432)

pull/3511/head
Thorne 6 months ago committed by GitHub
parent 0c49959f67
commit 986f37b307
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -240,12 +240,15 @@ public class DebeziumUtils {
return variables;
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
public static BinlogOffset findBinlogOffset(
long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
if (mySqlSourceConfig.getServerIdRange() != null) {
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
}
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {

@ -193,7 +193,9 @@ public class StatefulTaskContext {
mySqlSplit.isSnapshotSplit()
? BinlogOffset.ofEarliest()
: initializeEffectiveOffset(
mySqlSplit.asBinlogSplit().getStartingOffset(), connection);
mySqlSplit.asBinlogSplit().getStartingOffset(),
connection,
sourceConfig);
LOG.info("Starting offset is initialized to {}", offset);

@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.mysql.source.offset;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import io.debezium.connector.mysql.MySqlConnection;
@ -45,13 +46,14 @@ public class BinlogOffsetUtils {
* </ul>
*/
public static BinlogOffset initializeEffectiveOffset(
BinlogOffset offset, MySqlConnection connection) {
BinlogOffset offset, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
BinlogOffsetKind offsetKind = offset.getOffsetKind();
switch (offsetKind) {
case EARLIEST:
return BinlogOffset.ofBinlogFilePosition("", 0);
case TIMESTAMP:
return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
return DebeziumUtils.findBinlogOffset(
offset.getTimestampSec() * 1000, connection, mySqlSourceConfig);
case LATEST:
return DebeziumUtils.currentBinlogOffset(connection);
default:

@ -1262,7 +1262,8 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
? BinlogOffset.ofEarliest()
: initializeEffectiveOffset(
mySqlSplit.asBinlogSplit().getStartingOffset(),
getConnection());
getConnection(),
getSourceConfig());
LOG.info("Starting offset is initialized to {}", offset);

@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.mysql.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
@ -263,9 +264,12 @@ public class SpecificStartingOffsetITCase {
// Purge binary log at first
purgeBinaryLogs();
long t0 = System.currentTimeMillis();
String servedId0 = "5400";
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000004", 0),
DebeziumUtils.findBinlogOffset(System.currentTimeMillis(), connection));
DebeziumUtils.findBinlogOffset(
t0, connection, getMySqlSourceConfig(t0, servedId0)));
executeStatements(
String.format(
@ -273,6 +277,7 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t1 = System.currentTimeMillis();
String servedId1 = "5401";
flushLogs();
executeStatements(
@ -281,6 +286,7 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t2 = System.currentTimeMillis();
String servedId2 = "5402";
flushLogs();
executeStatements(
@ -289,6 +295,7 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t3 = System.currentTimeMillis();
String servedId3 = "5403";
flushLogs();
executeStatements(
@ -297,6 +304,7 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t4 = System.currentTimeMillis();
String servedId4 = "5404";
flushLogs();
executeStatements(
@ -305,28 +313,35 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t5 = System.currentTimeMillis();
String servedId5 = "5405";
flushLogs();
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000005", 0),
DebeziumUtils.findBinlogOffset(t1, connection));
DebeziumUtils.findBinlogOffset(
t1, connection, getMySqlSourceConfig(t1, servedId1)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000006", 0),
DebeziumUtils.findBinlogOffset(t2, connection));
DebeziumUtils.findBinlogOffset(
t2, connection, getMySqlSourceConfig(t1, servedId2)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000007", 0),
DebeziumUtils.findBinlogOffset(t3, connection));
DebeziumUtils.findBinlogOffset(
t3, connection, getMySqlSourceConfig(t1, servedId3)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000008", 0),
DebeziumUtils.findBinlogOffset(t4, connection));
DebeziumUtils.findBinlogOffset(
t4, connection, getMySqlSourceConfig(t1, servedId4)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
DebeziumUtils.findBinlogOffset(t5, connection));
DebeziumUtils.findBinlogOffset(
t5, connection, getMySqlSourceConfig(t1, servedId5)));
purgeBinaryLogs();
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
DebeziumUtils.findBinlogOffset(t3, connection));
DebeziumUtils.findBinlogOffset(
t5, connection, getMySqlSourceConfig(t1, servedId5)));
}
@Test
@ -440,6 +455,15 @@ public class SpecificStartingOffsetITCase {
return DebeziumUtils.createMySqlConnection(configuration, new Properties());
}
private MySqlSourceConfig getMySqlSourceConfig(Long timestamp, String serverId) {
return getSourceBuilder()
.startupOptions(StartupOptions.timestamp(timestamp))
.serverId(serverId)
.build()
.getConfigFactory()
.createConfig(0);
}
private void executeStatements(String... statements) throws Exception {
connection.execute(statements);
connection.commit();

Loading…
Cancel
Save