diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java index 295877f1f..932131d96 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/DebeziumSourceFunction.java @@ -19,6 +19,8 @@ package com.alibaba.ververica.cdc.debezium; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -44,7 +46,10 @@ import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; import io.debezium.embedded.Connect; import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.heartbeat.Heartbeat; import io.debezium.relational.history.HistoryRecord; +import org.apache.commons.collections.map.LinkedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +80,7 @@ import java.util.concurrent.TimeUnit; @PublicEvolving public class DebeziumSourceFunction extends RichSourceFunction implements CheckpointedFunction, + CheckpointListener, ResultTypeQueryable { private static final long serialVersionUID = -5808108641062931623L; @@ -87,6 +93,11 @@ public class DebeziumSourceFunction extends RichSourceFunction implements /** State name of the consumer's history records state. */ public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states"; + /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */ + public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; + + // ------------------------------------------------------------------------------------------- + /** The schema to convert from Debezium's messages into Flink's objects. */ private final DebeziumDeserializationSchema deserializer; @@ -96,6 +107,9 @@ public class DebeziumSourceFunction extends RichSourceFunction implements /** The specific binlog offset to read from when the first startup. */ private final @Nullable DebeziumOffset specificOffset; + /** Data for pending but uncommitted offsets. */ + private final LinkedMap pendingOffsetsToCommit = new LinkedMap(); + private ExecutorService executor; private DebeziumEngine engine; @@ -223,12 +237,12 @@ public class DebeziumSourceFunction extends RichSourceFunction implements if (!running) { LOG.debug("snapshotState() called on closed source"); } else { - snapshotOffsetState(); + snapshotOffsetState(functionSnapshotContext.getCheckpointId()); snapshotHistoryRecordsState(); } } - private void snapshotOffsetState() throws Exception { + private void snapshotOffsetState(long checkpointId) throws Exception { offsetState.clear(); final DebeziumChangeConsumer consumer = this.debeziumConsumer; @@ -254,6 +268,14 @@ public class DebeziumSourceFunction extends RichSourceFunction implements if (serializedOffset != null) { offsetState.add(serializedOffset); + // the map cannot be asynchronously updated, because only one checkpoint call + // can happen on this function at a time: either snapshotState() or + // notifyCheckpointComplete() + pendingOffsetsToCommit.put(checkpointId, serializedOffset); + // truncate the map of pending offsets to commit, to prevent infinite growth + while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { + pendingOffsetsToCommit.remove(0); + } } } @@ -301,16 +323,22 @@ public class DebeziumSourceFunction extends RichSourceFunction implements // history instance name to initialize FlinkDatabaseHistory properties.setProperty(FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName); + // we have to filter out the heartbeat events, otherwise the deserializer will fail + String dbzHeartbeatPrefix = properties.getProperty( + Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), + Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString()); this.debeziumConsumer = new DebeziumChangeConsumer<>( sourceContext, deserializer, restoredOffsetState == null, // DB snapshot phase if restore state is null - this::reportError); + this::reportError, + dbzHeartbeatPrefix); // create the engine with this configuration ... this.engine = DebeziumEngine.create(Connect.class) .using(properties) .notifying(debeziumConsumer) + .using(OffsetCommitPolicy.always()) .using((success, message, error) -> { if (!success && error != null) { this.reportError(error); @@ -346,6 +374,51 @@ public class DebeziumSourceFunction extends RichSourceFunction implements } } + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (!running) { + LOG.debug("notifyCheckpointComplete() called on closed source"); + return; + } + + final DebeziumChangeConsumer consumer = this.debeziumConsumer; + if (consumer == null) { + LOG.debug("notifyCheckpointComplete() called on uninitialized source"); + return; + } + + try { + final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn( + "Consumer subtask {} received confirmation for unknown checkpoint id {}", + getRuntimeContext().getIndexOfThisSubtask(), + checkpointId); + return; + } + + byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap); + + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingOffsetsToCommit.remove(0); + } + + if (serializedOffsets == null || serializedOffsets.length == 0) { + LOG.debug( + "Consumer subtask {} has empty checkpoint state.", + getRuntimeContext().getIndexOfThisSubtask()); + return; + } + + DebeziumOffset offset = DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets); + consumer.commitOffset(offset); + } catch (Exception e) { + // ignore exception if we are no longer running + LOG.warn("Ignore error when committing offset to database.", e); + } + } + @Override public void cancel() { // flag the main thread to exit. A thread interrupt will come anyways. @@ -395,4 +468,9 @@ public class DebeziumSourceFunction extends RichSourceFunction implements public TypeInformation getProducedType() { return deserializer.getProducedType(); } + + @VisibleForTesting + public LinkedMap getPendingOffsetsToCommit() { + return pendingOffsetsToCommit; + } } diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java index f722aac8b..6d9c87624 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java @@ -26,8 +26,10 @@ import org.apache.flink.util.Collector; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.connector.SnapshotRecord; import io.debezium.data.Envelope; +import io.debezium.embedded.EmbeddedEngineChangeEvent; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -48,6 +50,9 @@ public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer< private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class); + public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; + public static final String LAST_COMMIT_LSN_KEY = "lsn_commit"; + private final SourceFunction.SourceContext sourceContext; /** The lock that guarantees that record emission and state updates are atomic, @@ -66,21 +71,27 @@ public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer< private final DebeziumOffsetSerializer stateSerializer; + private final String heartbeatTopicPrefix; + private boolean isInDbSnapshotPhase; private boolean lockHold = false; + private DebeziumEngine.RecordCommitter> currentCommitter; + // ------------------------------------------------------------------------ public DebeziumChangeConsumer( SourceFunction.SourceContext sourceContext, DebeziumDeserializationSchema deserialization, boolean isInDbSnapshotPhase, - ErrorReporter errorReporter) { + ErrorReporter errorReporter, + String heartbeatTopicPrefix) { this.sourceContext = sourceContext; this.checkpointLock = sourceContext.getCheckpointLock(); this.deserialization = deserialization; this.isInDbSnapshotPhase = isInDbSnapshotPhase; + this.heartbeatTopicPrefix = heartbeatTopicPrefix; this.debeziumCollector = new DebeziumCollector(); this.errorReporter = errorReporter; this.debeziumOffset = new DebeziumOffset(); @@ -91,9 +102,15 @@ public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer< public void handleBatch( List> changeEvents, DebeziumEngine.RecordCommitter> committer) throws InterruptedException { + this.currentCommitter = committer; try { for (ChangeEvent event : changeEvents) { SourceRecord record = event.value(); + if (isHeartbeatEvent(record)) { + // drop heartbeat events + continue; + } + deserialization.deserialize(record, debeziumCollector); if (isInDbSnapshotPhase) { @@ -118,6 +135,11 @@ public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer< } } + private boolean isHeartbeatEvent(SourceRecord record) { + String topic = record.topic(); + return topic != null && topic.startsWith(heartbeatTopicPrefix); + } + private boolean isSnapshotRecord(SourceRecord record) { Struct value = (Struct) record.value(); if (value != null) { @@ -178,6 +200,43 @@ public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer< return stateSerializer.serialize(debeziumOffset); } + @SuppressWarnings("unchecked") + public void commitOffset(DebeziumOffset offset) throws InterruptedException { + if (currentCommitter == null) { + LOG.info("commitOffset() called on Debezium ChangeConsumer which doesn't receive records yet."); + return; + } + + // only the offset is used + SourceRecord recordWrapper = new SourceRecord( + offset.sourcePartition, + adjustSourceOffset((Map) offset.sourceOffset), + "DUMMY", + Schema.BOOLEAN_SCHEMA, + true); + EmbeddedEngineChangeEvent changeEvent = new EmbeddedEngineChangeEvent<>( + null, recordWrapper, recordWrapper); + currentCommitter.markProcessed(changeEvent); + currentCommitter.markBatchFinished(); + } + + /** + * We have to adjust type of LSN values to Long, because it might be Integer after deserialization, + * however {@code io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(java.util.Map)} + * requires Long. + */ + private Map adjustSourceOffset(Map sourceOffset) { + if (sourceOffset.containsKey(LAST_COMPLETELY_PROCESSED_LSN_KEY)) { + String value = sourceOffset.get(LAST_COMPLETELY_PROCESSED_LSN_KEY).toString(); + sourceOffset.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value)); + } + if (sourceOffset.containsKey(LAST_COMMIT_LSN_KEY)) { + String value = sourceOffset.get(LAST_COMMIT_LSN_KEY).toString(); + sourceOffset.put(LAST_COMMIT_LSN_KEY, Long.parseLong(value)); + } + return sourceOffset; + } + private class DebeziumCollector implements Collector { private final Queue records = new ArrayDeque<>(); diff --git a/flink-connector-debezium/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java b/flink-connector-debezium/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java new file mode 100644 index 000000000..61f3135be --- /dev/null +++ b/flink-connector-debezium/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java @@ -0,0 +1,57 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.embedded; + +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.RecordChangeEvent; +import org.apache.kafka.connect.source.SourceRecord; + +/** + * Copied from Debezium project. Make it public to be accessible from + * {@link com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer}. + */ +public class EmbeddedEngineChangeEvent implements ChangeEvent, RecordChangeEvent { + + private final K key; + private final V value; + private final SourceRecord sourceRecord; + + public EmbeddedEngineChangeEvent(K key, V value, SourceRecord sourceRecord) { + this.key = key; + this.value = value; + this.sourceRecord = sourceRecord; + } + + @Override + public K key() { + return key; + } + + @Override + public V value() { + return value; + } + + @Override + public V record() { + return value; + } + + @Override + public String destination() { + return sourceRecord.topic(); + } + + public SourceRecord sourceRecord() { + return sourceRecord; + } + + @Override + public String toString() { + return "EmbeddedEngineChangeEvent [key=" + key + ", value=" + value + ", sourceRecord=" + sourceRecord + "]"; + } +} diff --git a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java new file mode 100644 index 000000000..e2e0eab23 --- /dev/null +++ b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.postgres; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; +import io.debezium.heartbeat.Heartbeat; +import org.apache.kafka.connect.source.SourceRecord; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link DebeziumDeserializationSchema} which wraps a real {@link DebeziumDeserializationSchema} + * to drop heartbeat events. + * + * @see Heartbeat + */ +public class HeartbeatEventFilter implements DebeziumDeserializationSchema { + private static final long serialVersionUID = -4450118969976653497L; + + private final String heartbeatTopicPrefix; + private final DebeziumDeserializationSchema serializer; + + public HeartbeatEventFilter(String heartbeatTopicPrefix, DebeziumDeserializationSchema serializer) { + this.heartbeatTopicPrefix = checkNotNull(heartbeatTopicPrefix); + this.serializer = checkNotNull(serializer); + } + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + String topic = record.topic(); + if (topic != null && topic.startsWith(heartbeatTopicPrefix)) { + // drop heartbeat events + return; + } + serializer.deserialize(record, out); + } + + @Override + public TypeInformation getProducedType() { + return serializer.getProducedType(); + } +} diff --git a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSource.java b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSource.java index 3266cfe87..31390eb6d 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSource.java +++ b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSource.java @@ -22,6 +22,7 @@ import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import io.debezium.connector.postgresql.PostgresConnector; +import java.time.Duration; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -31,6 +32,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class PostgreSQLSource { + private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis(); + public static Builder builder() { return new Builder<>(); } @@ -167,6 +170,9 @@ public class PostgreSQLSource { props.setProperty("database.password", checkNotNull(password)); props.setProperty("database.port", String.valueOf(port)); props.setProperty("slot.name", slotName); + // we have to enable heartbeat for PG to make sure DebeziumChangeConsumer#handleBatch + // is invoked after job restart + props.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS)); if (schemaList != null) { props.setProperty("schema.whitelist", String.join(",", schemaList)); diff --git a/flink-connector-postgres-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSourceTest.java b/flink-connector-postgres-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSourceTest.java index d5690265a..5705a4925 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSourceTest.java +++ b/flink-connector-postgres-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/postgres/PostgreSQLSourceTest.java @@ -44,10 +44,14 @@ import org.junit.Test; import java.nio.charset.StandardCharsets; import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -62,12 +66,14 @@ import static com.alibaba.ververica.cdc.connectors.utils.AssertUtils.assertUpdat import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; /** * Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}. */ public class PostgreSQLSourceTest extends PostgresTestBase { + private static final String SLOT_NAME = "flink"; @Before public void before() { @@ -187,7 +193,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase { assertEquals(1, offsetState.list.size()); String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); - assertEquals("postgres_binlog_source", JsonPath.read(state, "$.sourcePartition.server")); + assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); assertEquals("557", JsonPath.read(state, "$.sourceOffset.txId").toString()); assertEquals("true", JsonPath.read(state, "$.sourceOffset.last_snapshot_record").toString()); assertEquals("true", JsonPath.read(state, "$.sourceOffset.snapshot").toString()); @@ -237,7 +243,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase { assertEquals(1, offsetState.list.size()); String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); - assertEquals("postgres_binlog_source", JsonPath.read(state, "$.sourcePartition.server")); + assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); assertEquals("558", JsonPath.read(state, "$.sourceOffset.txId").toString()); assertTrue(state.contains("ts_usec")); assertFalse(state.contains("snapshot")); @@ -298,7 +304,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase { } assertEquals(1, offsetState.list.size()); String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); - assertEquals("postgres_binlog_source", JsonPath.read(state, "$.sourcePartition.server")); + assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString()); assertTrue(state.contains("ts_usec")); assertFalse(state.contains("snapshot")); @@ -339,7 +345,7 @@ public class PostgreSQLSourceTest extends PostgresTestBase { } assertEquals(1, offsetState.list.size()); String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); - assertEquals("postgres_binlog_source", JsonPath.read(state, "$.sourcePartition.server")); + assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString()); assertTrue(state.contains("ts_usec")); assertFalse(state.contains("snapshot")); @@ -352,11 +358,131 @@ public class PostgreSQLSourceTest extends PostgresTestBase { } } + @Test + public void testFlushLsn() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + final LinkedHashSet flushLsn = new LinkedHashSet<>(); + { + // --------------------------------------------------------------------------- + // Step-1: start the source from empty state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source = createPostgreSqlSource(); + final TestSourceContext sourceContext = new TestSourceContext<>(); + // setup source with empty state + setupSource(source, false, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until consumer is started + int received = drain(sourceContext, 9).size(); + assertEquals(9, received); + + // --------------------------------------------------------------------------- + // Step-2: trigger checkpoint-1 after snapshot finished + // --------------------------------------------------------------------------- + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + source.notifyCheckpointComplete(101); + assertTrue(flushLsn.add(getConfirmedFlushLsn())); + + batchInsertAndCheckpoint(5, source, sourceContext, 201); + assertEquals(1, source.getPendingOffsetsToCommit().size()); + source.notifyCheckpointComplete(201); + assertEquals(0, source.getPendingOffsetsToCommit().size()); + assertTrue(flushLsn.add(getConfirmedFlushLsn())); + + batchInsertAndCheckpoint(1, source, sourceContext, 301); + // do not notify checkpoint complete to see the LSN is not advanced. + assertFalse(flushLsn.add(getConfirmedFlushLsn())); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext)); + + source.cancel(); + source.close(); + runThread.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-3: restore the source from state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source2 = createPostgreSqlSource(); + final TestSourceContext sourceContext2 = new TestSourceContext<>(); + // setup source with empty state + setupSource(source2, true, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = new CheckedThread() { + @Override + public void go() throws Exception { + source2.run(sourceContext2); + } + }; + runThread.start(); + + assertFalse(flushLsn.add(getConfirmedFlushLsn())); + + batchInsertAndCheckpoint(0, source2, sourceContext2, 401); + Thread.sleep(3_000); // waiting heartbeat events, we set 1s heartbeat interval + // trigger checkpoint once again to make sure ChangeConsumer is initialized + batchInsertAndCheckpoint(0, source2, sourceContext2, 402); + source2.notifyCheckpointComplete(402); + assertTrue(flushLsn.add(getConfirmedFlushLsn())); + + batchInsertAndCheckpoint(3, source2, sourceContext2, 501); + batchInsertAndCheckpoint(2, source2, sourceContext2, 502); + batchInsertAndCheckpoint(1, source2, sourceContext2, 503); + assertEquals(3, source2.getPendingOffsetsToCommit().size()); + source2.notifyCheckpointComplete(503); + assertTrue(flushLsn.add(getConfirmedFlushLsn())); + assertEquals(0, source2.getPendingOffsetsToCommit().size()); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext2)); + + source2.cancel(); + source2.close(); + runThread.sync(); + } + + assertEquals(4, flushLsn.size()); + } + + private void batchInsertAndCheckpoint( + int num, + DebeziumSourceFunction source, + TestSourceContext sourceContext, + long checkpointId) throws Exception { + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = 0; i < num; i++) { + statement.execute("INSERT INTO inventory.products VALUES (default,'dummy','My Dummy',1.1)"); + } + } + assertEquals(num, drain(sourceContext, num).size()); + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(checkpointId, checkpointId)); + } + } + + // ------------------------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------------------------ private DebeziumSourceFunction createPostgreSqlSource() { + Properties properties = new Properties(); + properties.setProperty("heartbeat.interval.ms", "1000"); return PostgreSQLSource.builder() .hostname(POSTGERS_CONTAINER.getHost()) .port(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) @@ -366,9 +492,28 @@ public class PostgreSQLSourceTest extends PostgresTestBase { .schemaList("inventory") .tableList("inventory.products") .deserializer(new ForwardDeserializeSchema()) + .debeziumProperties(properties) .build(); } + private String getConfirmedFlushLsn() throws SQLException { + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery(String.format( + "select * from pg_replication_slots where slot_name = '%s' and database = '%s' and plugin = '%s'", + SLOT_NAME, + POSTGERS_CONTAINER.getDatabaseName(), + "decoderbufs" + )); + if (rs.next()) { + return rs.getString("confirmed_flush_lsn"); + } else { + fail("No replication slot info available"); + } + return null; + } + } + private List drain(TestSourceContext sourceContext, int expectedRecordCount) throws Exception { List allRecords = new ArrayList<>(); LinkedBlockingQueue> queue = sourceContext.getCollectedOutputs();