From 95af1a53052b893d264e7c83d9ac1a4dca982661 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Tue, 26 Sep 2023 17:29:42 +0800 Subject: [PATCH] [mysql] remove redundant source metrics processTime and emitDelay in mysql source --- .../metrics/MySqlSourceReaderMetrics.java | 39 ++----------------- .../source/reader/MySqlRecordEmitter.java | 7 +--- .../source/reader/MySqlSourceReaderTest.java | 5 --- 3 files changed, 5 insertions(+), 46 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java index 875496879..97c4d4291 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.source.metrics; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.MetricNames; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader; @@ -26,60 +27,26 @@ public class MySqlSourceReaderMetrics { private final MetricGroup metricGroup; - /** - * The last record processing time, which is updated after {@link MySqlSourceReader} fetches a - * batch of data. It's mainly used to report metrics sourceIdleTime for sourceIdleTime = - * System.currentTimeMillis() - processTime. - */ - private volatile long processTime = 0L; - /** * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the * record fetched into the source operator. */ private volatile long fetchDelay = 0L; - /** - * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the - * source operator. - */ - private volatile long emitDelay = 0L; - public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; } public void registerMetrics() { - metricGroup.gauge("currentFetchEventTimeLag", (Gauge) this::getFetchDelay); - metricGroup.gauge("currentEmitEventTimeLag", (Gauge) this::getEmitDelay); - metricGroup.gauge("sourceIdleTime", (Gauge) this::getIdleTime); + metricGroup.gauge( + MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, (Gauge) this::getFetchDelay); } public long getFetchDelay() { return fetchDelay; } - public long getEmitDelay() { - return emitDelay; - } - - public long getIdleTime() { - // no previous process time at the beginning, return 0 as idle time - if (processTime == 0) { - return 0; - } - return System.currentTimeMillis() - processTime; - } - - public void recordProcessTime(long processTime) { - this.processTime = processTime; - } - public void recordFetchDelay(long fetchDelay) { this.fetchDelay = fetchDelay; } - - public void recordEmitDelay(long emitDelay) { - this.emitDelay = emitDelay; - } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index cbdaa042a..02e705ddc 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -130,19 +130,16 @@ public final class MySqlRecordEmitter } private void reportMetrics(SourceRecord element) { - long now = System.currentTimeMillis(); - // record the latest process time - sourceReaderMetrics.recordProcessTime(now); + Long messageTimestamp = getMessageTimestamp(element); if (messageTimestamp != null && messageTimestamp > 0L) { // report fetch delay Long fetchTimestamp = getFetchTimestamp(element); if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) { + // report fetch delay sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); } - // report emit delay - sourceReaderMetrics.recordEmitDelay(now - messageTimestamp); } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 9f9d1f869..4f1df5225 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -684,9 +684,6 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase { } private void reportMetrics(SourceRecord element) { - long now = System.currentTimeMillis(); - // record the latest process time - sourceReaderMetrics.recordProcessTime(now); Long messageTimestamp = getMessageTimestamp(element); if (messageTimestamp != null && messageTimestamp > 0L) { @@ -695,8 +692,6 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase { if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) { sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); } - // report emit delay - sourceReaderMetrics.recordEmitDelay(now - messageTimestamp); } }