[postgres] Flush LSN offset to PG after checkpoint complete to avoid infinite WAL segments

release-1.2
Jark Wu 4 years ago
parent 0ce58b4ff5
commit 4127661092
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -19,6 +19,8 @@
package com.alibaba.ververica.cdc.debezium; package com.alibaba.ververica.cdc.debezium;
import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.state.OperatorStateStore;
@ -44,7 +46,10 @@ import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter; import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect; import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecord;
import org.apache.commons.collections.map.LinkedMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -75,6 +80,7 @@ import java.util.concurrent.TimeUnit;
@PublicEvolving @PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
CheckpointedFunction, CheckpointedFunction,
CheckpointListener,
ResultTypeQueryable<T> { ResultTypeQueryable<T> {
private static final long serialVersionUID = -5808108641062931623L; private static final long serialVersionUID = -5808108641062931623L;
@ -87,6 +93,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
/** State name of the consumer's history records state. */ /** State name of the consumer's history records state. */
public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states"; public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
// -------------------------------------------------------------------------------------------
/** The schema to convert from Debezium's messages into Flink's objects. */ /** The schema to convert from Debezium's messages into Flink's objects. */
private final DebeziumDeserializationSchema<T> deserializer; private final DebeziumDeserializationSchema<T> deserializer;
@ -96,6 +107,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
/** The specific binlog offset to read from when the first startup. */ /** The specific binlog offset to read from when the first startup. */
private final @Nullable DebeziumOffset specificOffset; private final @Nullable DebeziumOffset specificOffset;
/** Data for pending but uncommitted offsets. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
private ExecutorService executor; private ExecutorService executor;
private DebeziumEngine<?> engine; private DebeziumEngine<?> engine;
@ -223,12 +237,12 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
if (!running) { if (!running) {
LOG.debug("snapshotState() called on closed source"); LOG.debug("snapshotState() called on closed source");
} else { } else {
snapshotOffsetState(); snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState(); snapshotHistoryRecordsState();
} }
} }
private void snapshotOffsetState() throws Exception { private void snapshotOffsetState(long checkpointId) throws Exception {
offsetState.clear(); offsetState.clear();
final DebeziumChangeConsumer<?> consumer = this.debeziumConsumer; final DebeziumChangeConsumer<?> consumer = this.debeziumConsumer;
@ -254,6 +268,14 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
if (serializedOffset != null) { if (serializedOffset != null) {
offsetState.add(serializedOffset); offsetState.add(serializedOffset);
// the map cannot be asynchronously updated, because only one checkpoint call
// can happen on this function at a time: either snapshotState() or
// notifyCheckpointComplete()
pendingOffsetsToCommit.put(checkpointId, serializedOffset);
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
} }
} }
@ -301,16 +323,22 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
// history instance name to initialize FlinkDatabaseHistory // history instance name to initialize FlinkDatabaseHistory
properties.setProperty(FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName); properties.setProperty(FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
// we have to filter out the heartbeat events, otherwise the deserializer will fail
String dbzHeartbeatPrefix = properties.getProperty(
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
this.debeziumConsumer = new DebeziumChangeConsumer<>( this.debeziumConsumer = new DebeziumChangeConsumer<>(
sourceContext, sourceContext,
deserializer, deserializer,
restoredOffsetState == null, // DB snapshot phase if restore state is null restoredOffsetState == null, // DB snapshot phase if restore state is null
this::reportError); this::reportError,
dbzHeartbeatPrefix);
// create the engine with this configuration ... // create the engine with this configuration ...
this.engine = DebeziumEngine.create(Connect.class) this.engine = DebeziumEngine.create(Connect.class)
.using(properties) .using(properties)
.notifying(debeziumConsumer) .notifying(debeziumConsumer)
.using(OffsetCommitPolicy.always())
.using((success, message, error) -> { .using((success, message, error) -> {
if (!success && error != null) { if (!success && error != null) {
this.reportError(error); this.reportError(error);
@ -346,6 +374,51 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
} }
} }
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (!running) {
LOG.debug("notifyCheckpointComplete() called on closed source");
return;
}
final DebeziumChangeConsumer<T> consumer = this.debeziumConsumer;
if (consumer == null) {
LOG.debug("notifyCheckpointComplete() called on uninitialized source");
return;
}
try {
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1) {
LOG.warn(
"Consumer subtask {} received confirmation for unknown checkpoint id {}",
getRuntimeContext().getIndexOfThisSubtask(),
checkpointId);
return;
}
byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0);
}
if (serializedOffsets == null || serializedOffsets.length == 0) {
LOG.debug(
"Consumer subtask {} has empty checkpoint state.",
getRuntimeContext().getIndexOfThisSubtask());
return;
}
DebeziumOffset offset = DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
consumer.commitOffset(offset);
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
}
}
@Override @Override
public void cancel() { public void cancel() {
// flag the main thread to exit. A thread interrupt will come anyways. // flag the main thread to exit. A thread interrupt will come anyways.
@ -395,4 +468,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
public TypeInformation<T> getProducedType() { public TypeInformation<T> getProducedType() {
return deserializer.getProducedType(); return deserializer.getProducedType();
} }
@VisibleForTesting
public LinkedMap getPendingOffsetsToCommit() {
return pendingOffsetsToCommit;
}
} }

@ -26,8 +26,10 @@ import org.apache.flink.util.Collector;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.connector.SnapshotRecord; import io.debezium.connector.SnapshotRecord;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent; import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -48,6 +50,9 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class); private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
private final SourceFunction.SourceContext<T> sourceContext; private final SourceFunction.SourceContext<T> sourceContext;
/** The lock that guarantees that record emission and state updates are atomic, /** The lock that guarantees that record emission and state updates are atomic,
@ -66,21 +71,27 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
private final DebeziumOffsetSerializer stateSerializer; private final DebeziumOffsetSerializer stateSerializer;
private final String heartbeatTopicPrefix;
private boolean isInDbSnapshotPhase; private boolean isInDbSnapshotPhase;
private boolean lockHold = false; private boolean lockHold = false;
private DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> currentCommitter;
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
public DebeziumChangeConsumer( public DebeziumChangeConsumer(
SourceFunction.SourceContext<T> sourceContext, SourceFunction.SourceContext<T> sourceContext,
DebeziumDeserializationSchema<T> deserialization, DebeziumDeserializationSchema<T> deserialization,
boolean isInDbSnapshotPhase, boolean isInDbSnapshotPhase,
ErrorReporter errorReporter) { ErrorReporter errorReporter,
String heartbeatTopicPrefix) {
this.sourceContext = sourceContext; this.sourceContext = sourceContext;
this.checkpointLock = sourceContext.getCheckpointLock(); this.checkpointLock = sourceContext.getCheckpointLock();
this.deserialization = deserialization; this.deserialization = deserialization;
this.isInDbSnapshotPhase = isInDbSnapshotPhase; this.isInDbSnapshotPhase = isInDbSnapshotPhase;
this.heartbeatTopicPrefix = heartbeatTopicPrefix;
this.debeziumCollector = new DebeziumCollector(); this.debeziumCollector = new DebeziumCollector();
this.errorReporter = errorReporter; this.errorReporter = errorReporter;
this.debeziumOffset = new DebeziumOffset(); this.debeziumOffset = new DebeziumOffset();
@ -91,9 +102,15 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
public void handleBatch( public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents, List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents,
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException { DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException {
this.currentCommitter = committer;
try { try {
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) { for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
SourceRecord record = event.value(); SourceRecord record = event.value();
if (isHeartbeatEvent(record)) {
// drop heartbeat events
continue;
}
deserialization.deserialize(record, debeziumCollector); deserialization.deserialize(record, debeziumCollector);
if (isInDbSnapshotPhase) { if (isInDbSnapshotPhase) {
@ -118,6 +135,11 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
} }
} }
private boolean isHeartbeatEvent(SourceRecord record) {
String topic = record.topic();
return topic != null && topic.startsWith(heartbeatTopicPrefix);
}
private boolean isSnapshotRecord(SourceRecord record) { private boolean isSnapshotRecord(SourceRecord record) {
Struct value = (Struct) record.value(); Struct value = (Struct) record.value();
if (value != null) { if (value != null) {
@ -178,6 +200,43 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
return stateSerializer.serialize(debeziumOffset); return stateSerializer.serialize(debeziumOffset);
} }
@SuppressWarnings("unchecked")
public void commitOffset(DebeziumOffset offset) throws InterruptedException {
if (currentCommitter == null) {
LOG.info("commitOffset() called on Debezium ChangeConsumer which doesn't receive records yet.");
return;
}
// only the offset is used
SourceRecord recordWrapper = new SourceRecord(
offset.sourcePartition,
adjustSourceOffset((Map<String, Object>) offset.sourceOffset),
"DUMMY",
Schema.BOOLEAN_SCHEMA,
true);
EmbeddedEngineChangeEvent<SourceRecord, SourceRecord> changeEvent = new EmbeddedEngineChangeEvent<>(
null, recordWrapper, recordWrapper);
currentCommitter.markProcessed(changeEvent);
currentCommitter.markBatchFinished();
}
/**
* We have to adjust type of LSN values to Long, because it might be Integer after deserialization,
* however {@code io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(java.util.Map)}
* requires Long.
*/
private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset) {
if (sourceOffset.containsKey(LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
String value = sourceOffset.get(LAST_COMPLETELY_PROCESSED_LSN_KEY).toString();
sourceOffset.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value));
}
if (sourceOffset.containsKey(LAST_COMMIT_LSN_KEY)) {
String value = sourceOffset.get(LAST_COMMIT_LSN_KEY).toString();
sourceOffset.put(LAST_COMMIT_LSN_KEY, Long.parseLong(value));
}
return sourceOffset;
}
private class DebeziumCollector implements Collector<T> { private class DebeziumCollector implements Collector<T> {
private final Queue<T> records = new ArrayDeque<>(); private final Queue<T> records = new ArrayDeque<>();

@ -0,0 +1,57 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
/**
* Copied from Debezium project. Make it public to be accessible from
* {@link com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer}.
*/
public class EmbeddedEngineChangeEvent<K, V> implements ChangeEvent<K, V>, RecordChangeEvent<V> {
private final K key;
private final V value;
private final SourceRecord sourceRecord;
public EmbeddedEngineChangeEvent(K key, V value, SourceRecord sourceRecord) {
this.key = key;
this.value = value;
this.sourceRecord = sourceRecord;
}
@Override
public K key() {
return key;
}
@Override
public V value() {
return value;
}
@Override
public V record() {
return value;
}
@Override
public String destination() {
return sourceRecord.topic();
}
public SourceRecord sourceRecord() {
return sourceRecord;
}
@Override
public String toString() {
return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value + ", sourceRecord=" + sourceRecord + "]";
}
}

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.postgres;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.heartbeat.Heartbeat;
import org.apache.kafka.connect.source.SourceRecord;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link DebeziumDeserializationSchema} which wraps a real {@link DebeziumDeserializationSchema}
* to drop heartbeat events.
*
* @see Heartbeat
*/
public class HeartbeatEventFilter<T> implements DebeziumDeserializationSchema<T> {
private static final long serialVersionUID = -4450118969976653497L;
private final String heartbeatTopicPrefix;
private final DebeziumDeserializationSchema<T> serializer;
public HeartbeatEventFilter(String heartbeatTopicPrefix, DebeziumDeserializationSchema<T> serializer) {
this.heartbeatTopicPrefix = checkNotNull(heartbeatTopicPrefix);
this.serializer = checkNotNull(serializer);
}
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
String topic = record.topic();
if (topic != null && topic.startsWith(heartbeatTopicPrefix)) {
// drop heartbeat events
return;
}
serializer.deserialize(record, out);
}
@Override
public TypeInformation<T> getProducedType() {
return serializer.getProducedType();
}
}

@ -22,6 +22,7 @@ import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.connector.postgresql.PostgresConnector; import io.debezium.connector.postgresql.PostgresConnector;
import java.time.Duration;
import java.util.Properties; import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
@ -31,6 +32,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/ */
public class PostgreSQLSource { public class PostgreSQLSource {
private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis();
public static <T> Builder<T> builder() { public static <T> Builder<T> builder() {
return new Builder<>(); return new Builder<>();
} }
@ -167,6 +170,9 @@ public class PostgreSQLSource {
props.setProperty("database.password", checkNotNull(password)); props.setProperty("database.password", checkNotNull(password));
props.setProperty("database.port", String.valueOf(port)); props.setProperty("database.port", String.valueOf(port));
props.setProperty("slot.name", slotName); props.setProperty("slot.name", slotName);
// we have to enable heartbeat for PG to make sure DebeziumChangeConsumer#handleBatch
// is invoked after job restart
props.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS));
if (schemaList != null) { if (schemaList != null) {
props.setProperty("schema.whitelist", String.join(",", schemaList)); props.setProperty("schema.whitelist", String.join(",", schemaList));

@ -44,10 +44,14 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -62,12 +66,14 @@ import static com.alibaba.ververica.cdc.connectors.utils.AssertUtils.assertUpdat
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
/** /**
* Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}. * Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}.
*/ */
public class PostgreSQLSourceTest extends PostgresTestBase { public class PostgreSQLSourceTest extends PostgresTestBase {
private static final String SLOT_NAME = "flink";
@Before @Before
public void before() { public void before() {
@ -187,7 +193,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
assertEquals(1, offsetState.list.size()); assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("postgres_binlog_source", JsonPath.read(state, "$.sourcePartition.server")); assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
assertEquals("557", JsonPath.read(state, "$.sourceOffset.txId").toString()); assertEquals("557", JsonPath.read(state, "$.sourceOffset.txId").toString());
assertEquals("true", JsonPath.read(state, "$.sourceOffset.last_snapshot_record").toString()); assertEquals("true", JsonPath.read(state, "$.sourceOffset.last_snapshot_record").toString());
assertEquals("true", JsonPath.read(state, "$.sourceOffset.snapshot").toString()); assertEquals("true", JsonPath.read(state, "$.sourceOffset.snapshot").toString());
@ -237,7 +243,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
assertEquals(1, offsetState.list.size()); assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("postgres_binlog_source", JsonPath.read(state, "$.sourcePartition.server")); assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
assertEquals("558", JsonPath.read(state, "$.sourceOffset.txId").toString()); assertEquals("558", JsonPath.read(state, "$.sourceOffset.txId").toString());
assertTrue(state.contains("ts_usec")); assertTrue(state.contains("ts_usec"));
assertFalse(state.contains("snapshot")); assertFalse(state.contains("snapshot"));
@ -298,7 +304,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
} }
assertEquals(1, offsetState.list.size()); assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("postgres_binlog_source", JsonPath.read(state, "$.sourcePartition.server")); assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString()); assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString());
assertTrue(state.contains("ts_usec")); assertTrue(state.contains("ts_usec"));
assertFalse(state.contains("snapshot")); assertFalse(state.contains("snapshot"));
@ -339,7 +345,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
} }
assertEquals(1, offsetState.list.size()); assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("postgres_binlog_source", JsonPath.read(state, "$.sourcePartition.server")); assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString()); assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString());
assertTrue(state.contains("ts_usec")); assertTrue(state.contains("ts_usec"));
assertFalse(state.contains("snapshot")); assertFalse(state.contains("snapshot"));
@ -352,11 +358,131 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
} }
} }
@Test
public void testFlushLsn() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
final LinkedHashSet<String> flushLsn = new LinkedHashSet<>();
{
// ---------------------------------------------------------------------------
// Step-1: start the source from empty state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source = createPostgreSqlSource();
final TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread = new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until consumer is started
int received = drain(sourceContext, 9).size();
assertEquals(9, received);
// ---------------------------------------------------------------------------
// Step-2: trigger checkpoint-1 after snapshot finished
// ---------------------------------------------------------------------------
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
source.notifyCheckpointComplete(101);
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(5, source, sourceContext, 201);
assertEquals(1, source.getPendingOffsetsToCommit().size());
source.notifyCheckpointComplete(201);
assertEquals(0, source.getPendingOffsetsToCommit().size());
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(1, source, sourceContext, 301);
// do not notify checkpoint complete to see the LSN is not advanced.
assertFalse(flushLsn.add(getConfirmedFlushLsn()));
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext));
source.cancel();
source.close();
runThread.sync();
}
{
// ---------------------------------------------------------------------------
// Step-3: restore the source from state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source2 = createPostgreSqlSource();
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
// setup source with empty state
setupSource(source2, true, offsetState, historyState, true, 0, 1);
final CheckedThread runThread = new CheckedThread() {
@Override
public void go() throws Exception {
source2.run(sourceContext2);
}
};
runThread.start();
assertFalse(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(0, source2, sourceContext2, 401);
Thread.sleep(3_000); // waiting heartbeat events, we set 1s heartbeat interval
// trigger checkpoint once again to make sure ChangeConsumer is initialized
batchInsertAndCheckpoint(0, source2, sourceContext2, 402);
source2.notifyCheckpointComplete(402);
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(3, source2, sourceContext2, 501);
batchInsertAndCheckpoint(2, source2, sourceContext2, 502);
batchInsertAndCheckpoint(1, source2, sourceContext2, 503);
assertEquals(3, source2.getPendingOffsetsToCommit().size());
source2.notifyCheckpointComplete(503);
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
assertEquals(0, source2.getPendingOffsetsToCommit().size());
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext2));
source2.cancel();
source2.close();
runThread.sync();
}
assertEquals(4, flushLsn.size());
}
private void batchInsertAndCheckpoint(
int num,
DebeziumSourceFunction<SourceRecord> source,
TestSourceContext<SourceRecord> sourceContext,
long checkpointId) throws Exception {
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
for (int i = 0; i < num; i++) {
statement.execute("INSERT INTO inventory.products VALUES (default,'dummy','My Dummy',1.1)");
}
}
assertEquals(num, drain(sourceContext, num).size());
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(checkpointId, checkpointId));
}
}
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
// Utilities // Utilities
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
private DebeziumSourceFunction<SourceRecord> createPostgreSqlSource() { private DebeziumSourceFunction<SourceRecord> createPostgreSqlSource() {
Properties properties = new Properties();
properties.setProperty("heartbeat.interval.ms", "1000");
return PostgreSQLSource.<SourceRecord>builder() return PostgreSQLSource.<SourceRecord>builder()
.hostname(POSTGERS_CONTAINER.getHost()) .hostname(POSTGERS_CONTAINER.getHost())
.port(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) .port(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
@ -366,9 +492,28 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
.schemaList("inventory") .schemaList("inventory")
.tableList("inventory.products") .tableList("inventory.products")
.deserializer(new ForwardDeserializeSchema()) .deserializer(new ForwardDeserializeSchema())
.debeziumProperties(properties)
.build(); .build();
} }
private String getConfirmedFlushLsn() throws SQLException {
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery(String.format(
"select * from pg_replication_slots where slot_name = '%s' and database = '%s' and plugin = '%s'",
SLOT_NAME,
POSTGERS_CONTAINER.getDatabaseName(),
"decoderbufs"
));
if (rs.next()) {
return rs.getString("confirmed_flush_lsn");
} else {
fail("No replication slot info available");
}
return null;
}
}
private <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount) throws Exception { private <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount) throws Exception {
List<T> allRecords = new ArrayList<>(); List<T> allRecords = new ArrayList<>();
LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs(); LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs();

Loading…
Cancel
Save