[mysql] Add source metrics for MySqlParallelSource

Co-authored-by: Leonard Xu <xbjtdcq@gmail.com>
pull/459/head
luoyuxia 3 years ago committed by Leonard Xu
parent 27001d8421
commit ccb73e013b

@ -41,6 +41,7 @@ import com.ververica.cdc.connectors.mysql.source.assigners.state.HybridPendingSp
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState; import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer; import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator; import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter; import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
@ -102,12 +103,15 @@ public class MySqlParallelSource<T>
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue = FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>(); new FutureCompletingBlockingQueue<>();
final Configuration readerConfiguration = getReaderConfig(readerContext); final Configuration readerConfiguration = getReaderConfig(readerContext);
final MySqlSourceReaderMetrics sourceReaderMetrics =
new MySqlSourceReaderMetrics(readerContext.metricGroup());
sourceReaderMetrics.registerMetrics();
Supplier<MySqlSplitReader> splitReaderSupplier = Supplier<MySqlSplitReader> splitReaderSupplier =
() -> new MySqlSplitReader(readerConfiguration, readerContext.getIndexOfSubtask()); () -> new MySqlSplitReader(readerConfiguration, readerContext.getIndexOfSubtask());
return new MySqlSourceReader<>( return new MySqlSourceReader<>(
elementsQueue, elementsQueue,
splitReaderSupplier, splitReaderSupplier,
new MySqlRecordEmitter<>(deserializationSchema), new MySqlRecordEmitter<>(deserializationSchema, sourceReaderMetrics),
readerConfiguration, readerConfiguration,
readerContext); readerContext);
} }

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.metrics;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/** A collection class for handling metrics in {@link MySqlSourceReader}. */
public class MySqlSourceReaderMetrics {
private final MetricGroup metricGroup;
/**
* The last record processing time, which is updated after {@link MySqlSourceReader} fetches a
* batch of data. It's mainly used to report metrics sourceIdleTime for sourceIdleTime =
* System.currentTimeMillis() - processTime.
*/
private volatile long processTime = 0L;
/**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
/**
* emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
* source operator.
*/
private volatile long emitDelay = 0L;
public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
public void registerMetrics() {
metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) this::getFetchDelay);
metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) this::getEmitDelay);
metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime);
}
public long getFetchDelay() {
return fetchDelay;
}
public long getEmitDelay() {
return emitDelay;
}
public long getIdleTime() {
// no previous process time at the beginning, return 0 as idle time
if (processTime == 0) {
return 0;
}
return System.currentTimeMillis() - processTime;
}
public void recordProcessTime(long processTime) {
this.processTime = processTime;
}
public void recordFetchDelay(long fetchDelay) {
this.fetchDelay = fetchDelay;
}
public void recordEmitDelay(long emitDelay) {
this.emitDelay = emitDelay;
}
}

@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
@ -34,7 +35,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getFetchTimestamp;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getHistoryRecord;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getMessageTimestamp;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getWatermark; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getWatermark;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
@ -55,9 +58,13 @@ public final class MySqlRecordEmitter<T>
new JsonTableChangeSerializer(); new JsonTableChangeSerializer();
private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema; private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
private final MySqlSourceReaderMetrics sourceReaderMetrics;
public MySqlRecordEmitter(DebeziumDeserializationSchema<T> debeziumDeserializationSchema) { public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics) {
this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceReaderMetrics = sourceReaderMetrics;
} }
@Override @Override
@ -81,6 +88,7 @@ public final class MySqlRecordEmitter<T>
BinlogOffset position = getBinlogPosition(element); BinlogOffset position = getBinlogPosition(element);
splitState.asBinlogSplitState().setStartingOffset(position); splitState.asBinlogSplitState().setStartingOffset(position);
} }
reportMetrics(element);
debeziumDeserializationSchema.deserialize( debeziumDeserializationSchema.deserialize(
element, element,
new Collector<T>() { new Collector<T>() {
@ -99,4 +107,21 @@ public final class MySqlRecordEmitter<T>
LOG.info("Meet unknown element {}, just skip.", element); LOG.info("Meet unknown element {}, just skip.", element);
} }
} }
private void reportMetrics(SourceRecord element) {
long now = System.currentTimeMillis();
// record the latest process time
sourceReaderMetrics.recordProcessTime(now);
Long messageTimestamp = getMessageTimestamp(element);
if (messageTimestamp != null && messageTimestamp > 0L) {
// report fetch delay
Long fetchTimestamp = getFetchTimestamp(element);
if (fetchTimestamp != null) {
sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
}
// report emit delay
sourceReaderMetrics.recordEmitDelay(now - messageTimestamp);
}
}
} }

@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.mysql.source.utils;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind; import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind;
import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
@ -235,6 +236,42 @@ public class RecordUtils {
return new BinlogOffset(file, position); return new BinlogOffset(file, position);
} }
/**
* Return the timestamp when the change event is produced in MySQL.
*
* <p>The field `source.ts_ms` in {@link SourceRecord} data struct is the time when the change
* event is operated in MySQL.
*/
public static Long getMessageTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
Struct value = (Struct) record.value();
if (schema.field(Envelope.FieldName.SOURCE) == null) {
return null;
}
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
return null;
}
return source.getInt64(Envelope.FieldName.TIMESTAMP);
}
/**
* Return the timestamp when the change event is fetched in {@link DebeziumReader}.
*
* <p>The field `ts_ms` in {@link SourceRecord} data struct is the time when the record fetched
* by debezium reader, use it as the process time in Source.
*/
public static Long getFetchTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
Struct value = (Struct) record.value();
if (schema.field(Envelope.FieldName.TIMESTAMP) == null) {
return null;
}
return value.getInt64(Envelope.FieldName.TIMESTAMP);
}
public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) { public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
Schema keySchema = sourceRecord.keySchema(); Schema keySchema = sourceRecord.keySchema();
if (keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())) { if (keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())) {

Loading…
Cancel
Save