[debezium] Add source metrics according to FLIP-33

release-1.3
Jark Wu 4 years ago
parent ea01014c64
commit 5539094917
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -29,6 +29,8 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@ -373,6 +375,14 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
executor.execute(engine); executor.execute(engine);
debeziumStarted = true; debeziumStarted = true;
// initialize metrics
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
metricGroup.gauge(
"currentFetchEventTimeLag", (Gauge<Long>) () -> debeziumConsumer.getFetchDelay());
metricGroup.gauge(
"currentEmitEventTimeLag", (Gauge<Long>) () -> debeziumConsumer.getEmitDelay());
metricGroup.gauge("sourceIdleTime", (Gauge<Long>) () -> debeziumConsumer.getIdleTime());
// on a clean exit, wait for the runner thread // on a clean exit, wait for the runner thread
try { try {
while (running) { while (running) {

@ -80,6 +80,28 @@ public class DebeziumChangeConsumer<T>
private boolean lockHold = false; private boolean lockHold = false;
// ---------------------------------------------------------------------------------------
// Metrics
// ---------------------------------------------------------------------------------------
/** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */
private volatile long messageTimestamp = 0L;
/** The last record processing time. */
private volatile long processTime = 0L;
/**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
/**
* emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
* source operator.
*/
private volatile long emitDelay = 0L;
private DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> private DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>>
currentCommitter; currentCommitter;
@ -108,9 +130,13 @@ public class DebeziumChangeConsumer<T>
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException { throws InterruptedException {
this.currentCommitter = committer; this.currentCommitter = committer;
this.processTime = System.currentTimeMillis();
try { try {
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) { for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
SourceRecord record = event.value(); SourceRecord record = event.value();
updateMessageTimestamp(record);
fetchDelay = processTime - messageTimestamp;
if (isHeartbeatEvent(record)) { if (isHeartbeatEvent(record)) {
// keep offset update // keep offset update
synchronized (checkpointLock) { synchronized (checkpointLock) {
@ -148,6 +174,24 @@ public class DebeziumChangeConsumer<T>
} }
} }
private void updateMessageTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
Struct value = (Struct) record.value();
if (schema.field(Envelope.FieldName.SOURCE) == null) {
return;
}
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
return;
}
Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
if (tsMs != null) {
this.messageTimestamp = tsMs;
}
}
private boolean isHeartbeatEvent(SourceRecord record) { private boolean isHeartbeatEvent(SourceRecord record) {
String topic = record.topic(); String topic = record.topic();
return topic != null && topic.startsWith(heartbeatTopicPrefix); return topic != null && topic.startsWith(heartbeatTopicPrefix);
@ -184,8 +228,10 @@ public class DebeziumChangeConsumer<T>
/** Emits a batch of records. */ /** Emits a batch of records. */
private void emitRecords( private void emitRecords(
Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) { Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
long currentTimestamp = System.currentTimeMillis();
T record; T record;
while ((record = records.poll()) != null) { while ((record = records.poll()) != null) {
emitDelay = currentTimestamp - messageTimestamp;
sourceContext.collect(record); sourceContext.collect(record);
} }
// update offset to state // update offset to state
@ -230,6 +276,18 @@ public class DebeziumChangeConsumer<T>
currentCommitter.markBatchFinished(); currentCommitter.markBatchFinished();
} }
public long getFetchDelay() {
return fetchDelay;
}
public long getEmitDelay() {
return emitDelay;
}
public long getIdleTime() {
return System.currentTimeMillis() - processTime;
}
/** /**
* We have to adjust type of LSN values to Long, because it might be Integer after * We have to adjust type of LSN values to Long, because it might be Integer after
* deserialization, however {@code * deserialization, however {@code

Loading…
Cancel
Save