[mysql] remove redundant source metrics processTime and emitDelay in mysql source

pull/2432/head
Hongshun Wang 1 year ago committed by Qingsheng Ren
parent ad927b7e79
commit 95af1a5305

@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.source.metrics;
import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
@ -26,60 +27,26 @@ public class MySqlSourceReaderMetrics {
private final MetricGroup metricGroup; private final MetricGroup metricGroup;
/**
* The last record processing time, which is updated after {@link MySqlSourceReader} fetches a
* batch of data. It's mainly used to report metrics sourceIdleTime for sourceIdleTime =
* System.currentTimeMillis() - processTime.
*/
private volatile long processTime = 0L;
/** /**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator. * record fetched into the source operator.
*/ */
private volatile long fetchDelay = 0L; private volatile long fetchDelay = 0L;
/**
* emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
* source operator.
*/
private volatile long emitDelay = 0L;
public MySqlSourceReaderMetrics(MetricGroup metricGroup) { public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup; this.metricGroup = metricGroup;
} }
public void registerMetrics() { public void registerMetrics() {
metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) this::getFetchDelay); metricGroup.gauge(
metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) this::getEmitDelay); MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, (Gauge<Long>) this::getFetchDelay);
metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime);
} }
public long getFetchDelay() { public long getFetchDelay() {
return fetchDelay; return fetchDelay;
} }
public long getEmitDelay() {
return emitDelay;
}
public long getIdleTime() {
// no previous process time at the beginning, return 0 as idle time
if (processTime == 0) {
return 0;
}
return System.currentTimeMillis() - processTime;
}
public void recordProcessTime(long processTime) {
this.processTime = processTime;
}
public void recordFetchDelay(long fetchDelay) { public void recordFetchDelay(long fetchDelay) {
this.fetchDelay = fetchDelay; this.fetchDelay = fetchDelay;
} }
public void recordEmitDelay(long emitDelay) {
this.emitDelay = emitDelay;
}
} }

@ -130,19 +130,16 @@ public final class MySqlRecordEmitter<T>
} }
private void reportMetrics(SourceRecord element) { private void reportMetrics(SourceRecord element) {
long now = System.currentTimeMillis();
// record the latest process time
sourceReaderMetrics.recordProcessTime(now);
Long messageTimestamp = getMessageTimestamp(element); Long messageTimestamp = getMessageTimestamp(element);
if (messageTimestamp != null && messageTimestamp > 0L) { if (messageTimestamp != null && messageTimestamp > 0L) {
// report fetch delay // report fetch delay
Long fetchTimestamp = getFetchTimestamp(element); Long fetchTimestamp = getFetchTimestamp(element);
if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) { if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) {
// report fetch delay
sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
} }
// report emit delay
sourceReaderMetrics.recordEmitDelay(now - messageTimestamp);
} }
} }

@ -684,9 +684,6 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
} }
private void reportMetrics(SourceRecord element) { private void reportMetrics(SourceRecord element) {
long now = System.currentTimeMillis();
// record the latest process time
sourceReaderMetrics.recordProcessTime(now);
Long messageTimestamp = getMessageTimestamp(element); Long messageTimestamp = getMessageTimestamp(element);
if (messageTimestamp != null && messageTimestamp > 0L) { if (messageTimestamp != null && messageTimestamp > 0L) {
@ -695,8 +692,6 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) { if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) {
sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
} }
// report emit delay
sourceReaderMetrics.recordEmitDelay(now - messageTimestamp);
} }
} }

Loading…
Cancel
Save