[source-connector][mysql] speedup searching binlog by timestamp (#2924)

* [feature] speedup search mysql binlog when mysql-cdc-connector consume binlog by timestamp

* improve code

* improve code

* code improve

* test case add and code improve

* code improve

* code improve

* code improve, ut add and bugfix for purged log

---------

Co-authored-by: TJX2014 <tangjinxin@deepexi.com>
pull/2957/head
FocusComputing 1 year ago committed by GitHub
parent 7ab1e87842
commit b42bc60ab9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,6 +19,9 @@ package com.ververica.cdc.connectors.mysql.debezium;
import org.apache.flink.util.FlinkRuntimeException;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
@ -41,11 +44,14 @@ import io.debezium.util.SchemaNameAdjuster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Predicate;
import static com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables;
@ -233,4 +239,96 @@ public class DebeziumUtils {
return variables;
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
while (rs.next()) {
String fileName = rs.getString(1);
long fileSize = rs.getLong(2);
if (fileSize > 0) {
binlogFiles.add(fileName);
}
}
};
try {
connection.query("SHOW BINARY LOGS", rsc);
LOG.info("Total search binlog: {}", binlogFiles);
if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}
String binlogName = searchBinlogName(client, targetMs, binlogFiles);
return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
}
private static String searchBinlogName(
BinaryLogClient client, long targetMs, List<String> binlogFiles)
throws IOException, InterruptedException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;
while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
endIdx = mid - 1;
} else {
return binlogFiles.get(mid);
}
}
return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}
private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile)
throws IOException, InterruptedException {
ArrayBlockingQueue<Long> binlogTimestamps = new ArrayBlockingQueue<>(1);
BinaryLogClient.EventListener eventListener =
event -> {
EventData data = event.getData();
if (data instanceof RotateEventData) {
// We skip RotateEventData because it does not contain the timestamp we are
// interested in.
return;
}
EventHeaderV4 header = event.getHeader();
long timestamp = header.getTimestamp();
if (timestamp > 0) {
binlogTimestamps.offer(timestamp);
try {
client.disconnect();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
try {
client.registerEventListener(eventListener);
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);
LOG.info("begin parse binlog: {}", binlogFile);
client.connect();
} finally {
client.unregisterEventListener(eventListener);
}
return binlogTimestamps.take();
}
}

@ -47,8 +47,9 @@ public class BinlogOffsetUtils {
BinlogOffsetKind offsetKind = offset.getOffsetKind();
switch (offsetKind) {
case EARLIEST:
case TIMESTAMP:
return BinlogOffset.ofBinlogFilePosition("", 0);
case TIMESTAMP:
return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
case LATEST:
return DebeziumUtils.currentBinlogOffset(connection);
default:

@ -47,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.rules.TemporaryFolder;
import org.locationtech.jts.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@ -256,6 +257,77 @@ public class SpecificStartingOffsetITCase {
restoredJobClient.cancel().get();
}
@Test
void testBinlogSplitFromTimestampOffset() throws Exception {
// Purge binary log at first
purgeBinaryLogs();
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000004", 0),
DebeziumUtils.findBinlogOffset(System.currentTimeMillis(), connection));
executeStatements(
String.format(
"INSERT INTO %s VALUES (15213, 'Alice', 'Rome', '123456987');",
customers.getTableId()));
Thread.sleep(1000);
long t1 = System.currentTimeMillis();
flushLogs();
executeStatements(
String.format(
"INSERT INTO %s VALUES (15513, 'Bob', 'Milan', '123456987');",
customers.getTableId()));
Thread.sleep(1000);
long t2 = System.currentTimeMillis();
flushLogs();
executeStatements(
String.format(
"INSERT INTO %s VALUES (18213, 'Charlie', 'Paris', '123456987');",
customers.getTableId()));
Thread.sleep(1000);
long t3 = System.currentTimeMillis();
flushLogs();
executeStatements(
String.format(
"INSERT INTO %s VALUES (19613, 'Tom', 'NewYork', '123456987');",
customers.getTableId()));
Thread.sleep(1000);
long t4 = System.currentTimeMillis();
flushLogs();
executeStatements(
String.format(
"INSERT INTO %s VALUES (20913, 'Cat', 'Washington', '123456987');",
customers.getTableId()));
Thread.sleep(1000);
long t5 = System.currentTimeMillis();
flushLogs();
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000005", 0),
DebeziumUtils.findBinlogOffset(t1, connection));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000006", 0),
DebeziumUtils.findBinlogOffset(t2, connection));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000007", 0),
DebeziumUtils.findBinlogOffset(t3, connection));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000008", 0),
DebeziumUtils.findBinlogOffset(t4, connection));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
DebeziumUtils.findBinlogOffset(t5, connection));
purgeBinaryLogs();
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
DebeziumUtils.findBinlogOffset(t3, connection));
}
@Test
void testStartingFromTimestampOffset() throws Exception {
// Purge binary log at first
@ -275,6 +347,9 @@ public class SpecificStartingOffsetITCase {
"INSERT INTO %s VALUES (18213, 'Charlie', 'Paris', '123456987');",
customers.getTableId()));
// switch new log
flushLogs();
// Record current timestamp
Thread.sleep(1000);
StartupOptions startupOptions = StartupOptions.timestamp(System.currentTimeMillis());

Loading…
Cancel
Save