From 679b94034122e310173ce962445eeb9d79b47411 Mon Sep 17 00:00:00 2001
From: Shengkai <33114724+fsk119@users.noreply.github.com>
Date: Thu, 3 Jun 2021 19:29:22 +0800
Subject: [PATCH] [common] Add support for JDK 9+ (#194)
---
.../cdc/debezium/DebeziumSourceFunction.java | 150 +++++----
.../internal/DebeziumChangeConsumer.java | 253 ++-------------
.../internal/DebeziumChangeFetcher.java | 297 ++++++++++++++++++
.../cdc/debezium/internal/ErrorReporter.java | 28 --
.../cdc/debezium/internal/Handover.java | 197 ++++++++++++
.../embedded/EmbeddedEngineChangeEvent.java | 4 +-
6 files changed, 596 insertions(+), 333 deletions(-)
create mode 100644 flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeFetcher.java
delete mode 100644 flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/ErrorReporter.java
create mode 100644 flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/Handover.java
diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java
index b111eae21..786b2564b 100644
--- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java
+++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java
@@ -40,10 +40,12 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffset;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.alibaba.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
import com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.alibaba.ververica.cdc.debezium.internal.Handover;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
@@ -71,6 +73,22 @@ import java.util.concurrent.TimeUnit;
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
* from databases into Flink.
*
+ *
There are two workers during the runtime. One worker periodically pulls records from the
+ * database and pushes the records into the {@link Handover}. The other worker consumes the records
+ * from the {@link Handover} and convert the records to the data in Flink style. The reason why
+ * don't use one workers is because debezium has different behaviours in snapshot phase and
+ * streaming phase.
+ *
+ *
Here we use the {@link Handover} as the buffer to submit data from the producer to the
+ * consumer. Because the two threads don't communicate to each other directly, the error reporting
+ * also relies on {@link Handover}. When the engine gets errors, the engine uses the {@link
+ * DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} and wakes up the
+ * consumer to check the error. However, the source function just closes the engine and wakes up the
+ * producer if the error is from the Flink side.
+ *
+ *
If the execution is canceled or finish(only snapshot phase), the exit logic is as same as the
+ * logic in the error reporting.
+ *
*
The source function participates in checkpointing and guarantees that no data is lost during a
* failure, and that the computation processes elements "exactly once".
*
@@ -96,7 +114,9 @@ public class DebeziumSourceFunction extends RichSourceFunction
/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
- // -------------------------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------------------
+ // Properties
+ // ---------------------------------------------------------------------------------------
/** The schema to convert from Debezium's messages into Flink's objects. */
private final DebeziumDeserializationSchema deserializer;
@@ -110,20 +130,12 @@ public class DebeziumSourceFunction extends RichSourceFunction
/** Data for pending but uncommitted offsets. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
- private ExecutorService executor;
- private DebeziumEngine> engine;
-
- /** The error from {@link #engine} thread. */
- private transient volatile Throwable error;
-
- /** Flag indicating whether the consumer is still running. */
- private volatile boolean running = true;
-
/** Flag indicating whether the Debezium Engine is started. */
private volatile boolean debeziumStarted = false;
- /** The consumer to fetch records from {@link DebeziumEngine}. */
- private transient volatile DebeziumChangeConsumer debeziumConsumer;
+ // ---------------------------------------------------------------------------------------
+ // State
+ // ---------------------------------------------------------------------------------------
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
@@ -145,12 +157,29 @@ public class DebeziumSourceFunction extends RichSourceFunction
*/
private transient ListState historyRecordsState;
+ // ---------------------------------------------------------------------------------------
+ // Worker
+ // ---------------------------------------------------------------------------------------
+
+ private transient ExecutorService executor;
+ private transient DebeziumEngine> engine;
/**
* Unique name of this Debezium Engine instance across all the jobs. Currently we randomly
* generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
*/
private transient String engineInstanceName;
+ /** Consume the events from the engine and commit the offset to the engine. */
+ private transient DebeziumChangeConsumer changeConsumer;
+
+ /** The consumer to fetch records from {@link Handover}. */
+ private transient DebeziumChangeFetcher debeziumChangeFetcher;
+
+ /** Buffer the events from the source and record the errors from the debezium. */
+ private transient Handover handover;
+
+ // ---------------------------------------------------------------------------------------
+
public DebeziumSourceFunction(
DebeziumDeserializationSchema deserializer,
Properties properties,
@@ -166,6 +195,8 @@ public class DebeziumSourceFunction extends RichSourceFunction
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
+ this.handover = new Handover();
+ this.changeConsumer = new DebeziumChangeConsumer(handover);
}
// ------------------------------------------------------------------------
@@ -248,7 +279,7 @@ public class DebeziumSourceFunction extends RichSourceFunction
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
- if (!running) {
+ if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
@@ -259,20 +290,20 @@ public class DebeziumSourceFunction extends RichSourceFunction
private void snapshotOffsetState(long checkpointId) throws Exception {
offsetState.clear();
- final DebeziumChangeConsumer> consumer = this.debeziumConsumer;
+ final DebeziumChangeFetcher> fetcher = this.debeziumChangeFetcher;
byte[] serializedOffset = null;
- if (consumer == null) {
- // the consumer has not yet been initialized, which means we need to return the
+ if (fetcher == null) {
+ // the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets
if (restoredOffsetState != null) {
serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
}
} else {
- byte[] currentState = consumer.snapshotCurrentState();
+ byte[] currentState = fetcher.snapshotCurrentState();
if (currentState == null && restoredOffsetState != null) {
- // the consumer has been initialized, but has not yet received any data,
- // which means we need to return the originally restored offsets
+ // the fetcher has been initialized, but has not yet received any data,
+ // which means we need to return the originally restored offsets.
serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8);
} else {
serializedOffset = currentState;
@@ -345,32 +376,31 @@ public class DebeziumSourceFunction extends RichSourceFunction
properties.getProperty(
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
- this.debeziumConsumer =
- new DebeziumChangeConsumer<>(
+ this.debeziumChangeFetcher =
+ new DebeziumChangeFetcher<>(
sourceContext,
deserializer,
restoredOffsetState == null, // DB snapshot phase if restore state is null
- this::reportError,
- dbzHeartbeatPrefix);
+ dbzHeartbeatPrefix,
+ handover);
// create the engine with this configuration ...
this.engine =
DebeziumEngine.create(Connect.class)
.using(properties)
- .notifying(debeziumConsumer)
+ .notifying(changeConsumer)
.using(OffsetCommitPolicy.always())
.using(
(success, message, error) -> {
- if (!success && error != null) {
- this.reportError(error);
+ if (success) {
+ // Close the handover and prepare to exit.
+ handover.close();
+ } else {
+ handover.reportError(error);
}
})
.build();
- if (!running) {
- return;
- }
-
// run the engine asynchronously
executor.execute(engine);
debeziumStarted = true;
@@ -378,40 +408,27 @@ public class DebeziumSourceFunction extends RichSourceFunction
// initialize metrics
MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
metricGroup.gauge(
- "currentFetchEventTimeLag", (Gauge) () -> debeziumConsumer.getFetchDelay());
+ "currentFetchEventTimeLag",
+ (Gauge) () -> debeziumChangeFetcher.getFetchDelay());
+ metricGroup.gauge(
+ "currentEmitEventTimeLag",
+ (Gauge) () -> debeziumChangeFetcher.getEmitDelay());
metricGroup.gauge(
- "currentEmitEventTimeLag", (Gauge) () -> debeziumConsumer.getEmitDelay());
- metricGroup.gauge("sourceIdleTime", (Gauge) () -> debeziumConsumer.getIdleTime());
+ "sourceIdleTime", (Gauge) () -> debeziumChangeFetcher.getIdleTime());
- // on a clean exit, wait for the runner thread
- try {
- while (running) {
- if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
- break;
- }
- if (error != null) {
- running = false;
- shutdownEngine();
- // rethrow the error from Debezium consumer
- ExceptionUtils.rethrow(error);
- }
- }
- } catch (InterruptedException e) {
- // may be the result of a wake-up interruption after an exception.
- // we ignore this here and only restore the interruption state
- Thread.currentThread().interrupt();
- }
+ // start the real debezium consumer
+ debeziumChangeFetcher.runFetchLoop();
}
@Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- if (!running) {
- LOG.debug("notifyCheckpointComplete() called on closed source");
+ public void notifyCheckpointComplete(long checkpointId) {
+ if (!debeziumStarted) {
+ LOG.debug("notifyCheckpointComplete() called when engine is not started.");
return;
}
- final DebeziumChangeConsumer consumer = this.debeziumConsumer;
- if (consumer == null) {
+ final DebeziumChangeFetcher fetcher = this.debeziumChangeFetcher;
+ if (fetcher == null) {
LOG.debug("notifyCheckpointComplete() called on uninitialized source");
return;
}
@@ -442,7 +459,7 @@ public class DebeziumSourceFunction extends RichSourceFunction
DebeziumOffset offset =
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
- consumer.commitOffset(offset);
+ changeConsumer.commitOffset(offset);
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
@@ -451,10 +468,9 @@ public class DebeziumSourceFunction extends RichSourceFunction
@Override
public void cancel() {
- // flag the main thread to exit. A thread interrupt will come anyways.
- running = false;
// safely and gracefully stop the engine
shutdownEngine();
+ debeziumChangeFetcher.close();
}
@Override
@@ -468,15 +484,6 @@ public class DebeziumSourceFunction extends RichSourceFunction
super.close();
}
- // --------------------------------------------------------------------------------
- // Error callbacks
- // --------------------------------------------------------------------------------
-
- private void reportError(Throwable error) {
- LOG.error("Reporting error:", error);
- this.error = error;
- }
-
/** Safely and gracefully stop the Debezium engine. */
private void shutdownEngine() {
try {
@@ -487,9 +494,14 @@ public class DebeziumSourceFunction extends RichSourceFunction
ExceptionUtils.rethrow(e);
} finally {
if (executor != null) {
- executor.shutdown();
+ executor.shutdownNow();
}
+
debeziumStarted = false;
+
+ if (handover != null) {
+ handover.close();
+ }
}
}
diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java
index 574793339..2f782483c 100644
--- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java
+++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java
@@ -19,246 +19,55 @@
package com.alibaba.ververica.cdc.debezium.internal;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.memory.MemoryUtils;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import io.debezium.connector.SnapshotRecord;
-import io.debezium.data.Envelope;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
-/**
- * A consumer that consumes change messages from {@link DebeziumEngine}.
- *
- * @param The type of elements produced by the consumer.
- */
+/** Consume debezium change events. */
@Internal
-public class DebeziumChangeConsumer
+public class DebeziumChangeConsumer
implements DebeziumEngine.ChangeConsumer> {
-
- private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
-
public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
+ private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
- private final SourceFunction.SourceContext sourceContext;
-
- /**
- * The lock that guarantees that record emission and state updates are atomic, from the view of
- * taking a checkpoint.
- */
- private final Object checkpointLock;
-
- /** The schema to convert from Debezium's messages into Flink's objects. */
- private final DebeziumDeserializationSchema deserialization;
-
- /** A collector to emit records in batch (bundle). * */
- private final DebeziumCollector debeziumCollector;
-
- private final ErrorReporter errorReporter;
-
- private final DebeziumOffset debeziumOffset;
-
- private final DebeziumOffsetSerializer stateSerializer;
-
- private final String heartbeatTopicPrefix;
-
- private boolean isInDbSnapshotPhase;
-
- private boolean lockHold = false;
-
- // ---------------------------------------------------------------------------------------
- // Metrics
- // ---------------------------------------------------------------------------------------
-
- /** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */
- private volatile long messageTimestamp = 0L;
-
- /** The last record processing time. */
- private volatile long processTime = 0L;
-
- /**
- * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
- * record fetched into the source operator.
- */
- private volatile long fetchDelay = 0L;
-
- /**
- * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
- * source operator.
- */
- private volatile long emitDelay = 0L;
-
- private DebeziumEngine.RecordCommitter>
- currentCommitter;
-
- // ------------------------------------------------------------------------
+ private final Handover handover;
+ // keep the modification is visible to the source function
+ private volatile RecordCommitter> currentCommitter;
- public DebeziumChangeConsumer(
- SourceFunction.SourceContext sourceContext,
- DebeziumDeserializationSchema deserialization,
- boolean isInDbSnapshotPhase,
- ErrorReporter errorReporter,
- String heartbeatTopicPrefix) {
- this.sourceContext = sourceContext;
- this.checkpointLock = sourceContext.getCheckpointLock();
- this.deserialization = deserialization;
- this.isInDbSnapshotPhase = isInDbSnapshotPhase;
- this.heartbeatTopicPrefix = heartbeatTopicPrefix;
- this.debeziumCollector = new DebeziumCollector();
- this.errorReporter = errorReporter;
- this.debeziumOffset = new DebeziumOffset();
- this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
+ public DebeziumChangeConsumer(Handover handover) {
+ this.handover = handover;
}
@Override
public void handleBatch(
- List> changeEvents,
- DebeziumEngine.RecordCommitter> committer)
- throws InterruptedException {
- this.currentCommitter = committer;
- this.processTime = System.currentTimeMillis();
+ List> events,
+ RecordCommitter> recordCommitter) {
try {
- for (ChangeEvent event : changeEvents) {
- SourceRecord record = event.value();
- updateMessageTimestamp(record);
- fetchDelay = processTime - messageTimestamp;
-
- if (isHeartbeatEvent(record)) {
- // keep offset update
- synchronized (checkpointLock) {
- debeziumOffset.setSourcePartition(record.sourcePartition());
- debeziumOffset.setSourceOffset(record.sourceOffset());
- }
- // drop heartbeat events
- continue;
- }
-
- deserialization.deserialize(record, debeziumCollector);
-
- if (isInDbSnapshotPhase) {
- if (!lockHold) {
- MemoryUtils.UNSAFE.monitorEnter(checkpointLock);
- lockHold = true;
- LOG.info(
- "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
- }
- if (!isSnapshotRecord(record)) {
- MemoryUtils.UNSAFE.monitorExit(checkpointLock);
- isInDbSnapshotPhase = false;
- LOG.info(
- "Received record from streaming binlog phase, released checkpoint lock.");
- }
- }
-
- // emit the actual records. this also updates offset state atomically
- emitRecordsUnderCheckpointLock(
- debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
- }
- } catch (Exception e) {
- LOG.error("Error happens when consuming change messages.", e);
- errorReporter.reportError(e);
+ currentCommitter = recordCommitter;
+ handover.produce(events);
+ } catch (Throwable e) {
+ // Hold this exception in handover and trigger the fetcher to exit
+ handover.reportError(e);
}
}
- private void updateMessageTimestamp(SourceRecord record) {
- Schema schema = record.valueSchema();
- Struct value = (Struct) record.value();
- if (schema.field(Envelope.FieldName.SOURCE) == null) {
- return;
- }
-
- Struct source = value.getStruct(Envelope.FieldName.SOURCE);
- if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
- return;
- }
-
- Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
- if (tsMs != null) {
- this.messageTimestamp = tsMs;
- }
- }
-
- private boolean isHeartbeatEvent(SourceRecord record) {
- String topic = record.topic();
- return topic != null && topic.startsWith(heartbeatTopicPrefix);
- }
-
- private boolean isSnapshotRecord(SourceRecord record) {
- Struct value = (Struct) record.value();
- if (value != null) {
- Struct source = value.getStruct(Envelope.FieldName.SOURCE);
- SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
- // even if it is the last record of snapshot, i.e. SnapshotRecord.LAST
- // we can still recover from checkpoint and continue to read the binlog,
- // because the checkpoint contains binlog position
- return SnapshotRecord.TRUE == snapshotRecord;
- }
- return false;
- }
-
- private void emitRecordsUnderCheckpointLock(
- Queue records, Map sourcePartition, Map sourceOffset)
- throws InterruptedException {
- if (isInDbSnapshotPhase) {
- // lockHolderThread holds the lock, don't need to hold it again
- emitRecords(records, sourcePartition, sourceOffset);
- } else {
- // emit the records, using the checkpoint lock to guarantee
- // atomicity of record emission and offset state update
- synchronized (checkpointLock) {
- emitRecords(records, sourcePartition, sourceOffset);
- }
- }
- }
-
- /** Emits a batch of records. */
- private void emitRecords(
- Queue records, Map sourcePartition, Map sourceOffset) {
- long currentTimestamp = System.currentTimeMillis();
- T record;
- while ((record = records.poll()) != null) {
- emitDelay = currentTimestamp - messageTimestamp;
- sourceContext.collect(record);
- }
- // update offset to state
- debeziumOffset.setSourcePartition(sourcePartition);
- debeziumOffset.setSourceOffset(sourceOffset);
- }
-
- /**
- * Takes a snapshot of the Debezium Consumer state.
- *
- * Important: This method must be called under the checkpoint lock.
- */
- public byte[] snapshotCurrentState() throws Exception {
- // this method assumes that the checkpoint lock is held
- assert Thread.holdsLock(checkpointLock);
- if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
- return null;
- }
-
- return stateSerializer.serialize(debeziumOffset);
- }
-
@SuppressWarnings("unchecked")
public void commitOffset(DebeziumOffset offset) throws InterruptedException {
+ // Although the committer is read/write by multi-thread, the committer will be not changed
+ // frequently.
if (currentCommitter == null) {
LOG.info(
- "commitOffset() called on Debezium ChangeConsumer which doesn't receive records yet.");
+ "commitOffset() called on Debezium change consumer which doesn't receive records yet.");
return;
}
@@ -276,18 +85,6 @@ public class DebeziumChangeConsumer
currentCommitter.markBatchFinished();
}
- public long getFetchDelay() {
- return fetchDelay;
- }
-
- public long getEmitDelay() {
- return emitDelay;
- }
-
- public long getIdleTime() {
- return System.currentTimeMillis() - processTime;
- }
-
/**
* We have to adjust type of LSN values to Long, because it might be Integer after
* deserialization, however {@code
@@ -305,16 +102,4 @@ public class DebeziumChangeConsumer
}
return sourceOffset;
}
-
- private class DebeziumCollector implements Collector {
- private final Queue records = new ArrayDeque<>();
-
- @Override
- public void collect(T record) {
- records.add(record);
- }
-
- @Override
- public void close() {}
- }
}
diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeFetcher.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeFetcher.java
new file mode 100644
index 000000000..3256cc91e
--- /dev/null
+++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeFetcher.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.ververica.cdc.debezium.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.data.Envelope;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * A Handler that convert change messages from {@link DebeziumEngine} to data in Flink. Considering
+ * Debezium in different mode has different strategies to hold the lock, e.g. snapshot, the handler
+ * also needs different strategy. In snapshot phase, the handler needs to hold the lock until the
+ * snapshot finishes. But in non-snapshot phase, the handler only needs to hold the lock when
+ * emitting the records.
+ *
+ * @param The type of elements produced by the handler.
+ */
+@Internal
+public class DebeziumChangeFetcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeFetcher.class);
+
+ private final SourceFunction.SourceContext sourceContext;
+
+ /**
+ * The lock that guarantees that record emission and state updates are atomic, from the view of
+ * taking a checkpoint.
+ */
+ private final Object checkpointLock;
+
+ /** The schema to convert from Debezium's messages into Flink's objects. */
+ private final DebeziumDeserializationSchema deserialization;
+
+ /** A collector to emit records in batch (bundle). */
+ private final DebeziumCollector debeziumCollector;
+
+ private final DebeziumOffset debeziumOffset;
+
+ private final DebeziumOffsetSerializer stateSerializer;
+
+ private final String heartbeatTopicPrefix;
+
+ private boolean isInDbSnapshotPhase;
+
+ private final Handover handover;
+
+ private volatile boolean isRunning = true;
+
+ // ---------------------------------------------------------------------------------------
+ // Metrics
+ // ---------------------------------------------------------------------------------------
+
+ /** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */
+ private volatile long messageTimestamp = 0L;
+
+ /** The last record processing time. */
+ private volatile long processTime = 0L;
+
+ /**
+ * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
+ * record fetched into the source operator.
+ */
+ private volatile long fetchDelay = 0L;
+
+ /**
+ * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
+ * source operator.
+ */
+ private volatile long emitDelay = 0L;
+
+ // ------------------------------------------------------------------------
+
+ public DebeziumChangeFetcher(
+ SourceFunction.SourceContext sourceContext,
+ DebeziumDeserializationSchema deserialization,
+ boolean isInDbSnapshotPhase,
+ String heartbeatTopicPrefix,
+ Handover handover) {
+ this.sourceContext = sourceContext;
+ this.checkpointLock = sourceContext.getCheckpointLock();
+ this.deserialization = deserialization;
+ this.isInDbSnapshotPhase = isInDbSnapshotPhase;
+ this.heartbeatTopicPrefix = heartbeatTopicPrefix;
+ this.debeziumCollector = new DebeziumCollector();
+ this.debeziumOffset = new DebeziumOffset();
+ this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
+ this.handover = handover;
+ }
+
+ /**
+ * Take a snapshot of the Debezium handler state.
+ *
+ * Important: This method must be called under the checkpoint lock.
+ */
+ public byte[] snapshotCurrentState() throws Exception {
+ // this method assumes that the checkpoint lock is held
+ assert Thread.holdsLock(checkpointLock);
+ if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
+ return null;
+ }
+
+ return stateSerializer.serialize(debeziumOffset);
+ }
+
+ /**
+ * Process change messages from the {@link Handover} and collect the processed messages by
+ * {@link Collector}.
+ */
+ public void runFetchLoop() throws Exception {
+ try {
+ // begin snapshot database phase
+ if (isInDbSnapshotPhase) {
+ List> events = handover.pollNext();
+
+ synchronized (checkpointLock) {
+ LOG.info(
+ "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
+ handleBatch(events);
+ while (isRunning && isInDbSnapshotPhase) {
+ handleBatch(handover.pollNext());
+ }
+ }
+ LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
+ }
+
+ // begin streaming binlog phase
+ while (isRunning) {
+ // If the handover is closed or has errors, exit.
+ // If there is no streaming phase, the handover will be closed by the engine.
+ handleBatch(handover.pollNext());
+ }
+ } catch (Handover.ClosedException e) {
+ // ignore
+ }
+ }
+
+ public void close() {
+ isRunning = false;
+ handover.close();
+ }
+
+ // ---------------------------------------------------------------------------------------
+ // Metric getter
+ // ---------------------------------------------------------------------------------------
+
+ public long getFetchDelay() {
+ return fetchDelay;
+ }
+
+ public long getEmitDelay() {
+ return emitDelay;
+ }
+
+ public long getIdleTime() {
+ return System.currentTimeMillis() - processTime;
+ }
+
+ // ---------------------------------------------------------------------------------------
+ // Helper
+ // ---------------------------------------------------------------------------------------
+
+ private void handleBatch(List> changeEvents)
+ throws Exception {
+ if (CollectionUtils.isEmpty(changeEvents)) {
+ return;
+ }
+ this.processTime = System.currentTimeMillis();
+
+ for (ChangeEvent event : changeEvents) {
+ SourceRecord record = event.value();
+ updateMessageTimestamp(record);
+ fetchDelay = processTime - messageTimestamp;
+
+ if (isHeartbeatEvent(record)) {
+ // keep offset update
+ synchronized (checkpointLock) {
+ debeziumOffset.setSourcePartition(record.sourcePartition());
+ debeziumOffset.setSourceOffset(record.sourceOffset());
+ }
+ // drop heartbeat events
+ continue;
+ }
+
+ deserialization.deserialize(record, debeziumCollector);
+
+ if (!isSnapshotRecord(record)) {
+ LOG.debug("Snapshot phase finishes.");
+ isInDbSnapshotPhase = false;
+ }
+
+ // emit the actual records. this also updates offset state atomically
+ emitRecordsUnderCheckpointLock(
+ debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
+ }
+ }
+
+ private void emitRecordsUnderCheckpointLock(
+ Queue records, Map sourcePartition, Map sourceOffset) {
+ // Emit the records. Use the checkpoint lock to guarantee
+ // atomicity of record emission and offset state update.
+ // The synchronized checkpointLock is reentrant. It's safe to sync again in snapshot mode.
+ synchronized (checkpointLock) {
+ T record;
+ while ((record = records.poll()) != null) {
+ emitDelay = System.currentTimeMillis() - messageTimestamp;
+ sourceContext.collect(record);
+ }
+ // update offset to state
+ debeziumOffset.setSourcePartition(sourcePartition);
+ debeziumOffset.setSourceOffset(sourceOffset);
+ }
+ }
+
+ private void updateMessageTimestamp(SourceRecord record) {
+ Schema schema = record.valueSchema();
+ Struct value = (Struct) record.value();
+ if (schema.field(Envelope.FieldName.SOURCE) == null) {
+ return;
+ }
+
+ Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+ if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
+ return;
+ }
+
+ Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
+ if (tsMs != null) {
+ this.messageTimestamp = tsMs;
+ }
+ }
+
+ private boolean isHeartbeatEvent(SourceRecord record) {
+ String topic = record.topic();
+ return topic != null && topic.startsWith(heartbeatTopicPrefix);
+ }
+
+ private boolean isSnapshotRecord(SourceRecord record) {
+ Struct value = (Struct) record.value();
+ if (value != null) {
+ Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+ SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
+ // even if it is the last record of snapshot, i.e. SnapshotRecord.LAST
+ // we can still recover from checkpoint and continue to read the binlog,
+ // because the checkpoint contains binlog position
+ return SnapshotRecord.TRUE == snapshotRecord;
+ }
+ return false;
+ }
+
+ // ---------------------------------------------------------------------------------------
+
+ private class DebeziumCollector implements Collector {
+
+ private final Queue records = new ArrayDeque<>();
+
+ @Override
+ public void collect(T record) {
+ records.add(record);
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/ErrorReporter.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/ErrorReporter.java
deleted file mode 100644
index 2afd5d420..000000000
--- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/ErrorReporter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.ververica.cdc.debezium.internal;
-
-import org.apache.flink.annotation.Internal;
-
-/** A reporter that can report errors to handler. */
-@Internal
-public interface ErrorReporter {
-
- void reportError(Throwable error);
-}
diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/Handover.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/Handover.java
new file mode 100644
index 000000000..8a46671c2
--- /dev/null
+++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/Handover.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.ververica.cdc.debezium.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.ExceptionUtils;
+
+import io.debezium.engine.ChangeEvent;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Handover is a utility to hand over data (a buffer of records) and exception from a
+ * producer thread to a consumer thread. It effectively behaves like a "size one
+ * blocking queue", with some extras around exception reporting, closing, and waking up thread
+ * without {@link Thread#interrupt() interrupting} threads.
+ *
+ * This class is used in the Flink Debezium Engine Consumer to hand over data and exceptions
+ * between the thread that runs the DebeziumEngine class and the main thread.
+ *
+ *
The Handover can also be "closed", signalling from one thread to the other that it the thread
+ * has terminated.
+ */
+@ThreadSafe
+@Internal
+public class Handover implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private List> next;
+
+ @GuardedBy("lock")
+ private Throwable error;
+
+ private boolean wakeupProducer;
+
+ /**
+ * Polls the next element from the Handover, possibly blocking until the next element is
+ * available. This method behaves similar to polling from a blocking queue.
+ *
+ * If an exception was handed in by the producer ({@link #reportError(Throwable)}), then that
+ * exception is thrown rather than an element being returned.
+ *
+ * @return The next element (buffer of records, never null).
+ * @throws ClosedException Thrown if the Handover was {@link #close() closed}.
+ * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method.
+ */
+ public List> pollNext() throws Exception {
+ synchronized (lock) {
+ while (next == null && error == null) {
+ lock.wait();
+ }
+ List> n = next;
+ if (n != null) {
+ next = null;
+ lock.notifyAll();
+ return n;
+ } else {
+ ExceptionUtils.rethrowException(error, error.getMessage());
+
+ // this statement cannot be reached since the above method always throws an
+ // exception this is only here to silence the compiler and any warnings
+ return Collections.emptyList();
+ }
+ }
+ }
+
+ /**
+ * Hands over an element from the producer. If the Handover already has an element that was not
+ * yet picked up by the consumer thread, this call blocks until the consumer picks up that
+ * previous element.
+ *
+ * This behavior is similar to a "size one" blocking queue.
+ *
+ * @param element The next element to hand over.
+ * @throws InterruptedException Thrown, if the thread is interrupted while blocking for the
+ * Handover to be empty.
+ */
+ public void produce(final List> element)
+ throws InterruptedException {
+
+ checkNotNull(element);
+
+ synchronized (lock) {
+ while (next != null && !wakeupProducer) {
+ lock.wait();
+ }
+
+ wakeupProducer = false;
+
+ // an error marks this as closed for the producer
+ if (error != null) {
+ ExceptionUtils.rethrow(error, error.getMessage());
+ } else {
+ // if there is no error, then this is open and can accept this element
+ next = element;
+ lock.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Reports an exception. The consumer will throw the given exception immediately, if it is
+ * currently blocked in the {@link #pollNext()} method, or the next time it calls that method.
+ *
+ * After this method has been called, no call to either {@link #produce( List)} or {@link
+ * #pollNext()} will ever return regularly any more, but will always return exceptionally.
+ *
+ *
If another exception was already reported, this method does nothing.
+ *
+ *
For the producer, the Handover will appear as if it was {@link #close() closed}.
+ *
+ * @param t The exception to report.
+ */
+ public void reportError(Throwable t) {
+ checkNotNull(t);
+
+ synchronized (lock) {
+ LOG.error("Reporting error:", t);
+ // do not override the initial exception
+ if (error == null) {
+ error = t;
+ }
+ next = null;
+ lock.notifyAll();
+ }
+ }
+
+ /**
+ * Return whether there is an error.
+ *
+ * @return whether there is an error
+ */
+ public boolean hasError() {
+ return error != null;
+ }
+
+ /**
+ * Closes the handover. Both the {@link #produce(List)} method and the {@link #pollNext()} will
+ * throw a {@link ClosedException} on any currently blocking and future invocations.
+ *
+ *
If an exception was previously reported via the {@link #reportError(Throwable)} method,
+ * that exception will not be overridden. The consumer thread will throw that exception upon
+ * calling {@link #pollNext()}, rather than the {@code ClosedException}.
+ */
+ @Override
+ public void close() {
+ synchronized (lock) {
+ next = null;
+ wakeupProducer = false;
+
+ if (error == null) {
+ error = new ClosedException();
+ }
+ lock.notifyAll();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * An exception thrown by the Handover in the {@link #pollNext()} or {@link #produce(List)}
+ * method, after the Handover was closed via {@link #close()}.
+ */
+ public static final class ClosedException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ }
+}
diff --git a/flink-connector-debezium/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java b/flink-connector-debezium/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java
index 0ca85eeff..a0608cb23 100644
--- a/flink-connector-debezium/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java
+++ b/flink-connector-debezium/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java
@@ -6,13 +6,13 @@
package io.debezium.embedded;
+import com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeFetcher;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
/**
- * Copied from Debezium project. Make it public to be accessible from {@link
- * com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer}.
+ * Copied from Debezium project. Make it public to be accessible from {@link DebeziumChangeFetcher}.
*/
public class EmbeddedEngineChangeEvent implements ChangeEvent, RecordChangeEvent {