[common] Add support for JDK 9+ (#194)

pull/205/head
Shengkai 4 years ago committed by GitHub
parent 6aa9f43a4e
commit 679b940341
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -40,10 +40,12 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffset;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.alibaba.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
import com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
import com.alibaba.ververica.cdc.debezium.internal.Handover;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
@ -71,6 +73,22 @@ import java.util.concurrent.TimeUnit;
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
* from databases into Flink.
*
* <p>There are two workers during the runtime. One worker periodically pulls records from the
* database and pushes the records into the {@link Handover}. The other worker consumes the records
* from the {@link Handover} and convert the records to the data in Flink style. The reason why
* don't use one workers is because debezium has different behaviours in snapshot phase and
* streaming phase.
*
* <p>Here we use the {@link Handover} as the buffer to submit data from the producer to the
* consumer. Because the two threads don't communicate to each other directly, the error reporting
* also relies on {@link Handover}. When the engine gets errors, the engine uses the {@link
* DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} and wakes up the
* consumer to check the error. However, the source function just closes the engine and wakes up the
* producer if the error is from the Flink side.
*
* <p>If the execution is canceled or finish(only snapshot phase), the exit logic is as same as the
* logic in the error reporting.
*
* <p>The source function participates in checkpointing and guarantees that no data is lost during a
* failure, and that the computation processes elements "exactly once".
*
@ -96,7 +114,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
// -------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------
// Properties
// ---------------------------------------------------------------------------------------
/** The schema to convert from Debezium's messages into Flink's objects. */
private final DebeziumDeserializationSchema<T> deserializer;
@ -110,20 +130,12 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Data for pending but uncommitted offsets. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
private ExecutorService executor;
private DebeziumEngine<?> engine;
/** The error from {@link #engine} thread. */
private transient volatile Throwable error;
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;
/** Flag indicating whether the Debezium Engine is started. */
private volatile boolean debeziumStarted = false;
/** The consumer to fetch records from {@link DebeziumEngine}. */
private transient volatile DebeziumChangeConsumer<T> debeziumConsumer;
// ---------------------------------------------------------------------------------------
// State
// ---------------------------------------------------------------------------------------
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
@ -145,12 +157,29 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
*/
private transient ListState<String> historyRecordsState;
// ---------------------------------------------------------------------------------------
// Worker
// ---------------------------------------------------------------------------------------
private transient ExecutorService executor;
private transient DebeziumEngine<?> engine;
/**
* Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
* generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
*/
private transient String engineInstanceName;
/** Consume the events from the engine and commit the offset to the engine. */
private transient DebeziumChangeConsumer changeConsumer;
/** The consumer to fetch records from {@link Handover}. */
private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;
// ---------------------------------------------------------------------------------------
public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@ -166,6 +195,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
}
// ------------------------------------------------------------------------
@ -248,7 +279,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (!running) {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
@ -259,20 +290,20 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
private void snapshotOffsetState(long checkpointId) throws Exception {
offsetState.clear();
final DebeziumChangeConsumer<?> consumer = this.debeziumConsumer;
final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
byte[] serializedOffset = null;
if (consumer == null) {
// the consumer has not yet been initialized, which means we need to return the
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets
if (restoredOffsetState != null) {
serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
}
} else {
byte[] currentState = consumer.snapshotCurrentState();
byte[] currentState = fetcher.snapshotCurrentState();
if (currentState == null && restoredOffsetState != null) {
// the consumer has been initialized, but has not yet received any data,
// which means we need to return the originally restored offsets
// the fetcher has been initialized, but has not yet received any data,
// which means we need to return the originally restored offsets.
serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
} else {
serializedOffset = currentState;
@ -345,32 +376,31 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
properties.getProperty(
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
this.debeziumConsumer =
new DebeziumChangeConsumer<>(
this.debeziumChangeFetcher =
new DebeziumChangeFetcher<>(
sourceContext,
deserializer,
restoredOffsetState == null, // DB snapshot phase if restore state is null
this::reportError,
dbzHeartbeatPrefix);
dbzHeartbeatPrefix,
handover);
// create the engine with this configuration ...
this.engine =
DebeziumEngine.create(Connect.class)
.using(properties)
.notifying(debeziumConsumer)
.notifying(changeConsumer)
.using(OffsetCommitPolicy.always())
.using(
(success, message, error) -> {
if (!success && error != null) {
this.reportError(error);
if (success) {
// Close the handover and prepare to exit.
handover.close();
} else {
handover.reportError(error);
}
})
.build();
if (!running) {
return;
}
// run the engine asynchronously
executor.execute(engine);
debeziumStarted = true;
@ -378,40 +408,27 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
// initialize metrics
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
metricGroup.gauge(
"currentFetchEventTimeLag", (Gauge<Long>) () -> debeziumConsumer.getFetchDelay());
"currentFetchEventTimeLag",
(Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
metricGroup.gauge(
"currentEmitEventTimeLag",
(Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
metricGroup.gauge(
"currentEmitEventTimeLag", (Gauge<Long>) () -> debeziumConsumer.getEmitDelay());
metricGroup.gauge("sourceIdleTime", (Gauge<Long>) () -> debeziumConsumer.getIdleTime());
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
// on a clean exit, wait for the runner thread
try {
while (running) {
if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
break;
}
if (error != null) {
running = false;
shutdownEngine();
// rethrow the error from Debezium consumer
ExceptionUtils.rethrow(error);
}
}
} catch (InterruptedException e) {
// may be the result of a wake-up interruption after an exception.
// we ignore this here and only restore the interruption state
Thread.currentThread().interrupt();
}
// start the real debezium consumer
debeziumChangeFetcher.runFetchLoop();
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (!running) {
LOG.debug("notifyCheckpointComplete() called on closed source");
public void notifyCheckpointComplete(long checkpointId) {
if (!debeziumStarted) {
LOG.debug("notifyCheckpointComplete() called when engine is not started.");
return;
}
final DebeziumChangeConsumer<T> consumer = this.debeziumConsumer;
if (consumer == null) {
final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
if (fetcher == null) {
LOG.debug("notifyCheckpointComplete() called on uninitialized source");
return;
}
@ -442,7 +459,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
DebeziumOffset offset =
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
consumer.commitOffset(offset);
changeConsumer.commitOffset(offset);
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
@ -451,10 +468,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
@Override
public void cancel() {
// flag the main thread to exit. A thread interrupt will come anyways.
running = false;
// safely and gracefully stop the engine
shutdownEngine();
debeziumChangeFetcher.close();
}
@Override
@ -468,15 +484,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
super.close();
}
// --------------------------------------------------------------------------------
// Error callbacks
// --------------------------------------------------------------------------------
private void reportError(Throwable error) {
LOG.error("Reporting error:", error);
this.error = error;
}
/** Safely and gracefully stop the Debezium engine. */
private void shutdownEngine() {
try {
@ -487,9 +494,14 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
ExceptionUtils.rethrow(e);
} finally {
if (executor != null) {
executor.shutdown();
executor.shutdownNow();
}
debeziumStarted = false;
if (handover != null) {
handover.close();
}
}
}

@ -19,246 +19,55 @@
package com.alibaba.ververica.cdc.debezium.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.connector.SnapshotRecord;
import io.debezium.data.Envelope;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
/**
* A consumer that consumes change messages from {@link DebeziumEngine}.
*
* @param <T> The type of elements produced by the consumer.
*/
/** Consume debezium change events. */
@Internal
public class DebeziumChangeConsumer<T>
public class DebeziumChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
private final SourceFunction.SourceContext<T> sourceContext;
/**
* The lock that guarantees that record emission and state updates are atomic, from the view of
* taking a checkpoint.
*/
private final Object checkpointLock;
/** The schema to convert from Debezium's messages into Flink's objects. */
private final DebeziumDeserializationSchema<T> deserialization;
/** A collector to emit records in batch (bundle). * */
private final DebeziumCollector debeziumCollector;
private final ErrorReporter errorReporter;
private final DebeziumOffset debeziumOffset;
private final DebeziumOffsetSerializer stateSerializer;
private final String heartbeatTopicPrefix;
private boolean isInDbSnapshotPhase;
private boolean lockHold = false;
// ---------------------------------------------------------------------------------------
// Metrics
// ---------------------------------------------------------------------------------------
/** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */
private volatile long messageTimestamp = 0L;
/** The last record processing time. */
private volatile long processTime = 0L;
/**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
/**
* emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
* source operator.
*/
private volatile long emitDelay = 0L;
private DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>>
currentCommitter;
// ------------------------------------------------------------------------
private final Handover handover;
// keep the modification is visible to the source function
private volatile RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> currentCommitter;
public DebeziumChangeConsumer(
SourceFunction.SourceContext<T> sourceContext,
DebeziumDeserializationSchema<T> deserialization,
boolean isInDbSnapshotPhase,
ErrorReporter errorReporter,
String heartbeatTopicPrefix) {
this.sourceContext = sourceContext;
this.checkpointLock = sourceContext.getCheckpointLock();
this.deserialization = deserialization;
this.isInDbSnapshotPhase = isInDbSnapshotPhase;
this.heartbeatTopicPrefix = heartbeatTopicPrefix;
this.debeziumCollector = new DebeziumCollector();
this.errorReporter = errorReporter;
this.debeziumOffset = new DebeziumOffset();
this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
public DebeziumChangeConsumer(Handover handover) {
this.handover = handover;
}
@Override
public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents,
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException {
this.currentCommitter = committer;
this.processTime = System.currentTimeMillis();
List<ChangeEvent<SourceRecord, SourceRecord>> events,
RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
try {
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
SourceRecord record = event.value();
updateMessageTimestamp(record);
fetchDelay = processTime - messageTimestamp;
if (isHeartbeatEvent(record)) {
// keep offset update
synchronized (checkpointLock) {
debeziumOffset.setSourcePartition(record.sourcePartition());
debeziumOffset.setSourceOffset(record.sourceOffset());
}
// drop heartbeat events
continue;
}
deserialization.deserialize(record, debeziumCollector);
if (isInDbSnapshotPhase) {
if (!lockHold) {
MemoryUtils.UNSAFE.monitorEnter(checkpointLock);
lockHold = true;
LOG.info(
"Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
}
if (!isSnapshotRecord(record)) {
MemoryUtils.UNSAFE.monitorExit(checkpointLock);
isInDbSnapshotPhase = false;
LOG.info(
"Received record from streaming binlog phase, released checkpoint lock.");
}
}
// emit the actual records. this also updates offset state atomically
emitRecordsUnderCheckpointLock(
debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
}
} catch (Exception e) {
LOG.error("Error happens when consuming change messages.", e);
errorReporter.reportError(e);
currentCommitter = recordCommitter;
handover.produce(events);
} catch (Throwable e) {
// Hold this exception in handover and trigger the fetcher to exit
handover.reportError(e);
}
}
private void updateMessageTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
Struct value = (Struct) record.value();
if (schema.field(Envelope.FieldName.SOURCE) == null) {
return;
}
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
return;
}
Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
if (tsMs != null) {
this.messageTimestamp = tsMs;
}
}
private boolean isHeartbeatEvent(SourceRecord record) {
String topic = record.topic();
return topic != null && topic.startsWith(heartbeatTopicPrefix);
}
private boolean isSnapshotRecord(SourceRecord record) {
Struct value = (Struct) record.value();
if (value != null) {
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
// even if it is the last record of snapshot, i.e. SnapshotRecord.LAST
// we can still recover from checkpoint and continue to read the binlog,
// because the checkpoint contains binlog position
return SnapshotRecord.TRUE == snapshotRecord;
}
return false;
}
private void emitRecordsUnderCheckpointLock(
Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset)
throws InterruptedException {
if (isInDbSnapshotPhase) {
// lockHolderThread holds the lock, don't need to hold it again
emitRecords(records, sourcePartition, sourceOffset);
} else {
// emit the records, using the checkpoint lock to guarantee
// atomicity of record emission and offset state update
synchronized (checkpointLock) {
emitRecords(records, sourcePartition, sourceOffset);
}
}
}
/** Emits a batch of records. */
private void emitRecords(
Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
long currentTimestamp = System.currentTimeMillis();
T record;
while ((record = records.poll()) != null) {
emitDelay = currentTimestamp - messageTimestamp;
sourceContext.collect(record);
}
// update offset to state
debeziumOffset.setSourcePartition(sourcePartition);
debeziumOffset.setSourceOffset(sourceOffset);
}
/**
* Takes a snapshot of the Debezium Consumer state.
*
* <p>Important: This method must be called under the checkpoint lock.
*/
public byte[] snapshotCurrentState() throws Exception {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
return null;
}
return stateSerializer.serialize(debeziumOffset);
}
@SuppressWarnings("unchecked")
public void commitOffset(DebeziumOffset offset) throws InterruptedException {
// Although the committer is read/write by multi-thread, the committer will be not changed
// frequently.
if (currentCommitter == null) {
LOG.info(
"commitOffset() called on Debezium ChangeConsumer which doesn't receive records yet.");
"commitOffset() called on Debezium change consumer which doesn't receive records yet.");
return;
}
@ -276,18 +85,6 @@ public class DebeziumChangeConsumer<T>
currentCommitter.markBatchFinished();
}
public long getFetchDelay() {
return fetchDelay;
}
public long getEmitDelay() {
return emitDelay;
}
public long getIdleTime() {
return System.currentTimeMillis() - processTime;
}
/**
* We have to adjust type of LSN values to Long, because it might be Integer after
* deserialization, however {@code
@ -305,16 +102,4 @@ public class DebeziumChangeConsumer<T>
}
return sourceOffset;
}
private class DebeziumCollector implements Collector<T> {
private final Queue<T> records = new ArrayDeque<>();
@Override
public void collect(T record) {
records.add(record);
}
@Override
public void close() {}
}
}

@ -0,0 +1,297 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.debezium.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.connector.SnapshotRecord;
import io.debezium.data.Envelope;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
/**
* A Handler that convert change messages from {@link DebeziumEngine} to data in Flink. Considering
* Debezium in different mode has different strategies to hold the lock, e.g. snapshot, the handler
* also needs different strategy. In snapshot phase, the handler needs to hold the lock until the
* snapshot finishes. But in non-snapshot phase, the handler only needs to hold the lock when
* emitting the records.
*
* @param <T> The type of elements produced by the handler.
*/
@Internal
public class DebeziumChangeFetcher<T> {
private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeFetcher.class);
private final SourceFunction.SourceContext<T> sourceContext;
/**
* The lock that guarantees that record emission and state updates are atomic, from the view of
* taking a checkpoint.
*/
private final Object checkpointLock;
/** The schema to convert from Debezium's messages into Flink's objects. */
private final DebeziumDeserializationSchema<T> deserialization;
/** A collector to emit records in batch (bundle). */
private final DebeziumCollector debeziumCollector;
private final DebeziumOffset debeziumOffset;
private final DebeziumOffsetSerializer stateSerializer;
private final String heartbeatTopicPrefix;
private boolean isInDbSnapshotPhase;
private final Handover handover;
private volatile boolean isRunning = true;
// ---------------------------------------------------------------------------------------
// Metrics
// ---------------------------------------------------------------------------------------
/** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */
private volatile long messageTimestamp = 0L;
/** The last record processing time. */
private volatile long processTime = 0L;
/**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
/**
* emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
* source operator.
*/
private volatile long emitDelay = 0L;
// ------------------------------------------------------------------------
public DebeziumChangeFetcher(
SourceFunction.SourceContext<T> sourceContext,
DebeziumDeserializationSchema<T> deserialization,
boolean isInDbSnapshotPhase,
String heartbeatTopicPrefix,
Handover handover) {
this.sourceContext = sourceContext;
this.checkpointLock = sourceContext.getCheckpointLock();
this.deserialization = deserialization;
this.isInDbSnapshotPhase = isInDbSnapshotPhase;
this.heartbeatTopicPrefix = heartbeatTopicPrefix;
this.debeziumCollector = new DebeziumCollector();
this.debeziumOffset = new DebeziumOffset();
this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
this.handover = handover;
}
/**
* Take a snapshot of the Debezium handler state.
*
* <p>Important: This method must be called under the checkpoint lock.
*/
public byte[] snapshotCurrentState() throws Exception {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
return null;
}
return stateSerializer.serialize(debeziumOffset);
}
/**
* Process change messages from the {@link Handover} and collect the processed messages by
* {@link Collector}.
*/
public void runFetchLoop() throws Exception {
try {
// begin snapshot database phase
if (isInDbSnapshotPhase) {
List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
synchronized (checkpointLock) {
LOG.info(
"Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
handleBatch(events);
while (isRunning && isInDbSnapshotPhase) {
handleBatch(handover.pollNext());
}
}
LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
}
// begin streaming binlog phase
while (isRunning) {
// If the handover is closed or has errors, exit.
// If there is no streaming phase, the handover will be closed by the engine.
handleBatch(handover.pollNext());
}
} catch (Handover.ClosedException e) {
// ignore
}
}
public void close() {
isRunning = false;
handover.close();
}
// ---------------------------------------------------------------------------------------
// Metric getter
// ---------------------------------------------------------------------------------------
public long getFetchDelay() {
return fetchDelay;
}
public long getEmitDelay() {
return emitDelay;
}
public long getIdleTime() {
return System.currentTimeMillis() - processTime;
}
// ---------------------------------------------------------------------------------------
// Helper
// ---------------------------------------------------------------------------------------
private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
throws Exception {
if (CollectionUtils.isEmpty(changeEvents)) {
return;
}
this.processTime = System.currentTimeMillis();
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
SourceRecord record = event.value();
updateMessageTimestamp(record);
fetchDelay = processTime - messageTimestamp;
if (isHeartbeatEvent(record)) {
// keep offset update
synchronized (checkpointLock) {
debeziumOffset.setSourcePartition(record.sourcePartition());
debeziumOffset.setSourceOffset(record.sourceOffset());
}
// drop heartbeat events
continue;
}
deserialization.deserialize(record, debeziumCollector);
if (!isSnapshotRecord(record)) {
LOG.debug("Snapshot phase finishes.");
isInDbSnapshotPhase = false;
}
// emit the actual records. this also updates offset state atomically
emitRecordsUnderCheckpointLock(
debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
}
}
private void emitRecordsUnderCheckpointLock(
Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
// Emit the records. Use the checkpoint lock to guarantee
// atomicity of record emission and offset state update.
// The synchronized checkpointLock is reentrant. It's safe to sync again in snapshot mode.
synchronized (checkpointLock) {
T record;
while ((record = records.poll()) != null) {
emitDelay = System.currentTimeMillis() - messageTimestamp;
sourceContext.collect(record);
}
// update offset to state
debeziumOffset.setSourcePartition(sourcePartition);
debeziumOffset.setSourceOffset(sourceOffset);
}
}
private void updateMessageTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
Struct value = (Struct) record.value();
if (schema.field(Envelope.FieldName.SOURCE) == null) {
return;
}
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
return;
}
Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
if (tsMs != null) {
this.messageTimestamp = tsMs;
}
}
private boolean isHeartbeatEvent(SourceRecord record) {
String topic = record.topic();
return topic != null && topic.startsWith(heartbeatTopicPrefix);
}
private boolean isSnapshotRecord(SourceRecord record) {
Struct value = (Struct) record.value();
if (value != null) {
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
// even if it is the last record of snapshot, i.e. SnapshotRecord.LAST
// we can still recover from checkpoint and continue to read the binlog,
// because the checkpoint contains binlog position
return SnapshotRecord.TRUE == snapshotRecord;
}
return false;
}
// ---------------------------------------------------------------------------------------
private class DebeziumCollector implements Collector<T> {
private final Queue<T> records = new ArrayDeque<>();
@Override
public void collect(T record) {
records.add(record);
}
@Override
public void close() {}
}
}

@ -1,28 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.debezium.internal;
import org.apache.flink.annotation.Internal;
/** A reporter that can report errors to handler. */
@Internal
public interface ErrorReporter {
void reportError(Throwable error);
}

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.debezium.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.ExceptionUtils;
import io.debezium.engine.ChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The Handover is a utility to hand over data (a buffer of records) and exception from a
* <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a "size one
* blocking queue", with some extras around exception reporting, closing, and waking up thread
* without {@link Thread#interrupt() interrupting} threads.
*
* <p>This class is used in the Flink Debezium Engine Consumer to hand over data and exceptions
* between the thread that runs the DebeziumEngine class and the main thread.
*
* <p>The Handover can also be "closed", signalling from one thread to the other that it the thread
* has terminated.
*/
@ThreadSafe
@Internal
public class Handover implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
private final Object lock = new Object();
@GuardedBy("lock")
private List<ChangeEvent<SourceRecord, SourceRecord>> next;
@GuardedBy("lock")
private Throwable error;
private boolean wakeupProducer;
/**
* Polls the next element from the Handover, possibly blocking until the next element is
* available. This method behaves similar to polling from a blocking queue.
*
* <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then that
* exception is thrown rather than an element being returned.
*
* @return The next element (buffer of records, never null).
* @throws ClosedException Thrown if the Handover was {@link #close() closed}.
* @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
*/
public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
synchronized (lock) {
while (next == null && error == null) {
lock.wait();
}
List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
if (n != null) {
next = null;
lock.notifyAll();
return n;
} else {
ExceptionUtils.rethrowException(error, error.getMessage());
// this statement cannot be reached since the above method always throws an
// exception this is only here to silence the compiler and any warnings
return Collections.emptyList();
}
}
}
/**
* Hands over an element from the producer. If the Handover already has an element that was not
* yet picked up by the consumer thread, this call blocks until the consumer picks up that
* previous element.
*
* <p>This behavior is similar to a "size one" blocking queue.
*
* @param element The next element to hand over.
* @throws InterruptedException Thrown, if the thread is interrupted while blocking for the
* Handover to be empty.
*/
public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
throws InterruptedException {
checkNotNull(element);
synchronized (lock) {
while (next != null && !wakeupProducer) {
lock.wait();
}
wakeupProducer = false;
// an error marks this as closed for the producer
if (error != null) {
ExceptionUtils.rethrow(error, error.getMessage());
} else {
// if there is no error, then this is open and can accept this element
next = element;
lock.notifyAll();
}
}
}
/**
* Reports an exception. The consumer will throw the given exception immediately, if it is
* currently blocked in the {@link #pollNext()} method, or the next time it calls that method.
*
* <p>After this method has been called, no call to either {@link #produce( List)} or {@link
* #pollNext()} will ever return regularly any more, but will always return exceptionally.
*
* <p>If another exception was already reported, this method does nothing.
*
* <p>For the producer, the Handover will appear as if it was {@link #close() closed}.
*
* @param t The exception to report.
*/
public void reportError(Throwable t) {
checkNotNull(t);
synchronized (lock) {
LOG.error("Reporting error:", t);
// do not override the initial exception
if (error == null) {
error = t;
}
next = null;
lock.notifyAll();
}
}
/**
* Return whether there is an error.
*
* @return whether there is an error
*/
public boolean hasError() {
return error != null;
}
/**
* Closes the handover. Both the {@link #produce(List)} method and the {@link #pollNext()} will
* throw a {@link ClosedException} on any currently blocking and future invocations.
*
* <p>If an exception was previously reported via the {@link #reportError(Throwable)} method,
* that exception will not be overridden. The consumer thread will throw that exception upon
* calling {@link #pollNext()}, rather than the {@code ClosedException}.
*/
@Override
public void close() {
synchronized (lock) {
next = null;
wakeupProducer = false;
if (error == null) {
error = new ClosedException();
}
lock.notifyAll();
}
}
// ------------------------------------------------------------------------
/**
* An exception thrown by the Handover in the {@link #pollNext()} or {@link #produce(List)}
* method, after the Handover was closed via {@link #close()}.
*/
public static final class ClosedException extends Exception {
private static final long serialVersionUID = 1L;
}
}

@ -6,13 +6,13 @@
package io.debezium.embedded;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
/**
* Copied from Debezium project. Make it public to be accessible from {@link
* com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer}.
* Copied from Debezium project. Make it public to be accessible from {@link DebeziumChangeFetcher}.
*/
public class EmbeddedEngineChangeEvent<K, V> implements ChangeEvent<K, V>, RecordChangeEvent<V> {

Loading…
Cancel
Save