diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java index b4e32875d..d45b45395 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java @@ -41,6 +41,7 @@ import com.ververica.cdc.connectors.mysql.source.assigners.state.HybridPendingSp import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState; import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer; import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator; +import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader; @@ -102,12 +103,15 @@ public class MySqlParallelSource FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); final Configuration readerConfiguration = getReaderConfig(readerContext); + final MySqlSourceReaderMetrics sourceReaderMetrics = + new MySqlSourceReaderMetrics(readerContext.metricGroup()); + sourceReaderMetrics.registerMetrics(); Supplier splitReaderSupplier = () -> new MySqlSplitReader(readerConfiguration, readerContext.getIndexOfSubtask()); return new MySqlSourceReader<>( elementsQueue, splitReaderSupplier, - new MySqlRecordEmitter<>(deserializationSchema), + new MySqlRecordEmitter<>(deserializationSchema, sourceReaderMetrics), readerConfiguration, readerContext); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java new file mode 100644 index 000000000..36532a281 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.metrics; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader; + +/** A collection class for handling metrics in {@link MySqlSourceReader}. */ +public class MySqlSourceReaderMetrics { + + private final MetricGroup metricGroup; + + /** + * The last record processing time, which is updated after {@link MySqlSourceReader} fetches a + * batch of data. It's mainly used to report metrics sourceIdleTime for sourceIdleTime = + * System.currentTimeMillis() - processTime. + */ + private volatile long processTime = 0L; + + /** + * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the + * record fetched into the source operator. + */ + private volatile long fetchDelay = 0L; + + /** + * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the + * source operator. + */ + private volatile long emitDelay = 0L; + + public MySqlSourceReaderMetrics(MetricGroup metricGroup) { + this.metricGroup = metricGroup; + } + + public void registerMetrics() { + metricGroup.gauge("currentFetchEventTimeLag", (Gauge) this::getFetchDelay); + metricGroup.gauge("currentEmitEventTimeLag", (Gauge) this::getEmitDelay); + metricGroup.gauge("sourceIdleTime", (Gauge) this::getIdleTime); + } + + public long getFetchDelay() { + return fetchDelay; + } + + public long getEmitDelay() { + return emitDelay; + } + + public long getIdleTime() { + // no previous process time at the beginning, return 0 as idle time + if (processTime == 0) { + return 0; + } + return System.currentTimeMillis() - processTime; + } + + public void recordProcessTime(long processTime) { + this.processTime = processTime; + } + + public void recordFetchDelay(long fetchDelay) { + this.fetchDelay = fetchDelay; + } + + public void recordEmitDelay(long emitDelay) { + this.emitDelay = emitDelay; + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 8f39bb8d8..8cfcf1ec9 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.util.Collector; +import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; @@ -34,7 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getFetchTimestamp; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getMessageTimestamp; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getWatermark; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; @@ -55,9 +58,13 @@ public final class MySqlRecordEmitter new JsonTableChangeSerializer(); private final DebeziumDeserializationSchema debeziumDeserializationSchema; + private final MySqlSourceReaderMetrics sourceReaderMetrics; - public MySqlRecordEmitter(DebeziumDeserializationSchema debeziumDeserializationSchema) { + public MySqlRecordEmitter( + DebeziumDeserializationSchema debeziumDeserializationSchema, + MySqlSourceReaderMetrics sourceReaderMetrics) { this.debeziumDeserializationSchema = debeziumDeserializationSchema; + this.sourceReaderMetrics = sourceReaderMetrics; } @Override @@ -81,6 +88,7 @@ public final class MySqlRecordEmitter BinlogOffset position = getBinlogPosition(element); splitState.asBinlogSplitState().setStartingOffset(position); } + reportMetrics(element); debeziumDeserializationSchema.deserialize( element, new Collector() { @@ -99,4 +107,21 @@ public final class MySqlRecordEmitter LOG.info("Meet unknown element {}, just skip.", element); } } + + private void reportMetrics(SourceRecord element) { + long now = System.currentTimeMillis(); + // record the latest process time + sourceReaderMetrics.recordProcessTime(now); + Long messageTimestamp = getMessageTimestamp(element); + + if (messageTimestamp != null && messageTimestamp > 0L) { + // report fetch delay + Long fetchTimestamp = getFetchTimestamp(element); + if (fetchTimestamp != null) { + sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); + } + // report emit delay + sourceReaderMetrics.recordEmitDelay(now - messageTimestamp); + } + } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java index f1f1d955c..72c32deb8 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.mysql.source.utils; import org.apache.flink.table.types.logical.RowType; import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind; +import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; @@ -235,6 +236,42 @@ public class RecordUtils { return new BinlogOffset(file, position); } + /** + * Return the timestamp when the change event is produced in MySQL. + * + *

The field `source.ts_ms` in {@link SourceRecord} data struct is the time when the change + * event is operated in MySQL. + */ + public static Long getMessageTimestamp(SourceRecord record) { + Schema schema = record.valueSchema(); + Struct value = (Struct) record.value(); + if (schema.field(Envelope.FieldName.SOURCE) == null) { + return null; + } + + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) { + return null; + } + + return source.getInt64(Envelope.FieldName.TIMESTAMP); + } + + /** + * Return the timestamp when the change event is fetched in {@link DebeziumReader}. + * + *

The field `ts_ms` in {@link SourceRecord} data struct is the time when the record fetched + * by debezium reader, use it as the process time in Source. + */ + public static Long getFetchTimestamp(SourceRecord record) { + Schema schema = record.valueSchema(); + Struct value = (Struct) record.value(); + if (schema.field(Envelope.FieldName.TIMESTAMP) == null) { + return null; + } + return value.getInt64(Envelope.FieldName.TIMESTAMP); + } + public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) { Schema keySchema = sourceRecord.keySchema(); if (keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())) {