From 5539094917e190e50416c74117afdcfe74bee2e7 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Sat, 27 Mar 2021 22:24:35 +0800 Subject: [PATCH] [debezium] Add source metrics according to FLIP-33 --- .../cdc/debezium/DebeziumSourceFunction.java | 10 ++++ .../internal/DebeziumChangeConsumer.java | 58 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java index a22ee4a80..b111eae21 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java @@ -29,6 +29,8 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -373,6 +375,14 @@ public class DebeziumSourceFunction extends RichSourceFunction executor.execute(engine); debeziumStarted = true; + // initialize metrics + MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + metricGroup.gauge( + "currentFetchEventTimeLag", (Gauge) () -> debeziumConsumer.getFetchDelay()); + metricGroup.gauge( + "currentEmitEventTimeLag", (Gauge) () -> debeziumConsumer.getEmitDelay()); + metricGroup.gauge("sourceIdleTime", (Gauge) () -> debeziumConsumer.getIdleTime()); + // on a clean exit, wait for the runner thread try { while (running) { diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java index 77a8fc845..574793339 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java @@ -80,6 +80,28 @@ public class DebeziumChangeConsumer private boolean lockHold = false; + // --------------------------------------------------------------------------------------- + // Metrics + // --------------------------------------------------------------------------------------- + + /** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */ + private volatile long messageTimestamp = 0L; + + /** The last record processing time. */ + private volatile long processTime = 0L; + + /** + * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the + * record fetched into the source operator. + */ + private volatile long fetchDelay = 0L; + + /** + * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the + * source operator. + */ + private volatile long emitDelay = 0L; + private DebeziumEngine.RecordCommitter> currentCommitter; @@ -108,9 +130,13 @@ public class DebeziumChangeConsumer DebeziumEngine.RecordCommitter> committer) throws InterruptedException { this.currentCommitter = committer; + this.processTime = System.currentTimeMillis(); try { for (ChangeEvent event : changeEvents) { SourceRecord record = event.value(); + updateMessageTimestamp(record); + fetchDelay = processTime - messageTimestamp; + if (isHeartbeatEvent(record)) { // keep offset update synchronized (checkpointLock) { @@ -148,6 +174,24 @@ public class DebeziumChangeConsumer } } + private void updateMessageTimestamp(SourceRecord record) { + Schema schema = record.valueSchema(); + Struct value = (Struct) record.value(); + if (schema.field(Envelope.FieldName.SOURCE) == null) { + return; + } + + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) { + return; + } + + Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP); + if (tsMs != null) { + this.messageTimestamp = tsMs; + } + } + private boolean isHeartbeatEvent(SourceRecord record) { String topic = record.topic(); return topic != null && topic.startsWith(heartbeatTopicPrefix); @@ -184,8 +228,10 @@ public class DebeziumChangeConsumer /** Emits a batch of records. */ private void emitRecords( Queue records, Map sourcePartition, Map sourceOffset) { + long currentTimestamp = System.currentTimeMillis(); T record; while ((record = records.poll()) != null) { + emitDelay = currentTimestamp - messageTimestamp; sourceContext.collect(record); } // update offset to state @@ -230,6 +276,18 @@ public class DebeziumChangeConsumer currentCommitter.markBatchFinished(); } + public long getFetchDelay() { + return fetchDelay; + } + + public long getEmitDelay() { + return emitDelay; + } + + public long getIdleTime() { + return System.currentTimeMillis() - processTime; + } + /** * We have to adjust type of LSN values to Long, because it might be Integer after * deserialization, however {@code