[FLINK-35129][postgres] Introduce scan.lsn-commit.checkpoints-num-delay option to control LSN offset commit lazily

This close #3349.
pull/3394/head
Muhammet Orazov 8 months ago committed by GitHub
parent 33891869a9
commit 5b28d1a579
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -28,8 +28,7 @@ under the License.
The Postgres CDC connector allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to setup the Postgres CDC connector to run SQL queries against PostgreSQL databases. The Postgres CDC connector allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to setup the Postgres CDC connector to run SQL queries against PostgreSQL databases.
Dependencies ## Dependencies
------------
In order to setup the Postgres CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. In order to setup the Postgres CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
@ -45,8 +44,7 @@ Download [flink-sql-connector-postgres-cdc-3.0.1.jar](https://repo1.maven.org/ma
**Note:** Refer to [flink-sql-connector-postgres-cdc](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc), more released versions will be available in the Maven central warehouse. **Note:** Refer to [flink-sql-connector-postgres-cdc](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc), more released versions will be available in the Maven central warehouse.
How to create a Postgres CDC table ## How to create a Postgres CDC table
----------------
The Postgres CDC table can be defined as following: The Postgres CDC table can be defined as following:
@ -76,8 +74,7 @@ CREATE TABLE shipments (
SELECT * FROM shipments; SELECT * FROM shipments;
``` ```
Connector Options ## Connector Options
----------------
<div class="highlight"> <div class="highlight">
<table class="colwidths-auto docutils"> <table class="colwidths-auto docutils">
@ -236,12 +233,29 @@ Connector Options
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true' so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td> </td>
</tr> </tr>
<tr>
<td>scan.lsn-commit.checkpoints-num-delay</td>
<td>optional</td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>The number of checkpoint delays before starting to commit the LSN offsets. <br>
The checkpoint LSN offsets will be committed in rolling fashion, the earliest checkpoint identifier will be committed first from the delayed checkpoints.
</td>
</tr>
</tbody> </tbody>
</table> </table>
</div> </div>
<div> <div>
Note: `slot.name` is recommended to set for different tables to avoid the potential `PSQLException: ERROR: replication slot "flink" is active for PID 974` error. See more [here](https://debezium.io/documentation/reference/2.0/connectors/postgresql.html#postgresql-property-slot-name). ### Notes
#### `slot.name` option
The `slot.name` is recommended to set for different tables to avoid the potential `PSQLException: ERROR: replication slot "flink" is active for PID 974` error. See more [here](https://debezium.io/documentation/reference/2.0/connectors/postgresql.html#postgresql-property-slot-name).
#### `scan.lsn-commit.checkpoints-num-delay` option
When consuming PostgreSQL logs, the LSN offset must be committed to trigger the log data cleanup for the corresponding slot. However, once the LSN offset is committed, earlier offsets become invalid. To ensure access to earlier LSN offsets for job recovery, we delay the LSN commit by `scan.lsn-commit.checkpoints-num-delay` (default value is `3`) checkpoints. This feature is available when config option `scan.incremental.snapshot.enabled` is set to true.
### Incremental Snapshot Options ### Incremental Snapshot Options
@ -340,8 +354,7 @@ The following options is available only when `scan.incremental.snapshot.enabled=
</table> </table>
</div> </div>
Available Metadata ## Available Metadata
----------------
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
@ -377,8 +390,7 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
</tbody> </tbody>
</table> </table>
Limitation ## Limitation
--------
### Can't perform checkpoint during scanning snapshot of tables when incremental snapshot is disabled ### Can't perform checkpoint during scanning snapshot of tables when incremental snapshot is disabled
@ -417,8 +429,7 @@ CREATE TABLE products (
); );
``` ```
Features ## Features
--------
### Incremental Snapshot Reading (Experimental) ### Incremental Snapshot Reading (Experimental)
@ -522,8 +533,7 @@ public class PostgreSQLSourceExample {
} }
``` ```
Data Type Mapping ## Data Type Mapping
----------------
<div class="wy-table-responsive"> <div class="wy-table-responsive">
<table class="colwidths-auto docutils"> <table class="colwidths-auto docutils">

@ -164,6 +164,12 @@ limitations under the License.
<version>${json-path.version}</version> <version>${json-path.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- tests will have log4j as the default logging framework available --> <!-- tests will have log4j as the default logging framework available -->

@ -63,13 +63,11 @@ import static io.debezium.connector.postgresql.Utils.currentOffset;
/** The dialect for Postgres. */ /** The dialect for Postgres. */
public class PostgresDialect implements JdbcDataSourceDialect { public class PostgresDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final String CONNECTION_NAME = "postgres-cdc-connector"; private static final String CONNECTION_NAME = "postgres-cdc-connector";
private final PostgresSourceConfig sourceConfig; private final PostgresSourceConfig sourceConfig;
private transient Tables.TableFilter filters; private transient Tables.TableFilter filters;
private transient CustomPostgresSchema schema; private transient CustomPostgresSchema schema;
@Nullable private PostgresStreamFetchTask streamFetchTask; @Nullable private PostgresStreamFetchTask streamFetchTask;
public PostgresDialect(PostgresSourceConfig sourceConfig) { public PostgresDialect(PostgresSourceConfig sourceConfig) {

@ -280,6 +280,12 @@ public class PostgresSourceBuilder<T> {
return this; return this;
} }
/** Set the {@code LSN} checkpoints delay number for Postgres to commit the offsets. */
public PostgresSourceBuilder<T> lsnCommitCheckpointsDelay(int lsnCommitDelay) {
this.configFactory.setLsnCommitCheckpointsDelay(lsnCommitDelay);
return this;
}
/** /**
* Build the {@link PostgresIncrementalSource}. * Build the {@link PostgresIncrementalSource}.
* *

@ -37,6 +37,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final int subtaskId; private final int subtaskId;
private final int lsnCommitCheckpointsDelay;
public PostgresSourceConfig( public PostgresSourceConfig(
int subtaskId, int subtaskId,
@ -64,7 +65,8 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
int connectionPoolSize, int connectionPoolSize,
@Nullable String chunkKeyColumn, @Nullable String chunkKeyColumn,
boolean skipSnapshotBackfill, boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled) { boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay) {
super( super(
startupOptions, startupOptions,
databaseList, databaseList,
@ -92,14 +94,34 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
skipSnapshotBackfill, skipSnapshotBackfill,
isScanNewlyAddedTableEnabled); isScanNewlyAddedTableEnabled);
this.subtaskId = subtaskId; this.subtaskId = subtaskId;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
} }
/**
* Returns {@code subtaskId} value.
*
* @return subtask id
*/
public int getSubtaskId() { public int getSubtaskId() {
return subtaskId; return subtaskId;
} }
/**
* Returns {@code lsnCommitCheckpointsDelay} value.
*
* @return lsn commit checkpoint delay
*/
public int getLsnCommitCheckpointsDelay() {
return this.lsnCommitCheckpointsDelay;
}
/**
* Returns the slot name for backfill task.
*
* @return backfill task slot name
*/
public String getSlotNameForBackfillTask() { public String getSlotNameForBackfillTask() {
return getDbzProperties().getProperty(SLOT_NAME.name()) + "_" + subtaskId; return getDbzProperties().getProperty(SLOT_NAME.name()) + "_" + getSubtaskId();
} }
@Override @Override

@ -50,6 +50,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
private List<String> schemaList; private List<String> schemaList;
private int lsnCommitCheckpointsDelay;
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */ /** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@Override @Override
public PostgresSourceConfig create(int subtaskId) { public PostgresSourceConfig create(int subtaskId) {
@ -100,7 +102,7 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
// The PostgresSource will do snapshot according to its StartupMode. // The PostgresSource will do snapshot according to its StartupMode.
// Do not need debezium to do the snapshot work. // Do not need debezium to do the snapshot work.
props.put("snapshot.mode", "never"); props.setProperty("snapshot.mode", "never");
Configuration dbzConfiguration = Configuration.from(props); Configuration dbzConfiguration = Configuration.from(props);
return new PostgresSourceConfig( return new PostgresSourceConfig(
@ -129,7 +131,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
connectionPoolSize, connectionPoolSize,
chunkKeyColumn, chunkKeyColumn,
skipSnapshotBackfill, skipSnapshotBackfill,
scanNewlyAddedTableEnabled); scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay);
} }
/** /**
@ -173,4 +176,9 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
public void heartbeatInterval(Duration heartbeatInterval) { public void heartbeatInterval(Duration heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval; this.heartbeatInterval = heartbeatInterval;
} }
/** The lsn commit checkpoints delay for Postgres. */
public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) {
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}
} }

@ -78,4 +78,14 @@ public class PostgresSourceOptions extends JdbcSourceOptions {
.defaultValue(Duration.ofSeconds(30)) .defaultValue(Duration.ofSeconds(30))
.withDescription( .withDescription(
"Optional interval of sending heartbeat event for tracing the latest available replication slot offsets"); "Optional interval of sending heartbeat event for tracing the latest available replication slot offsets");
public static final ConfigOption<Integer> SCAN_LSN_COMMIT_CHECKPOINTS_DELAY =
ConfigOptions.key("scan.lsn-commit.checkpoints-num-delay")
.intType()
.defaultValue(3)
.withDescription(
"The number of checkpoint delays before starting to commit the LSN offsets.\n"
+ "By setting this to higher value, the offset that is consumed by global slot will be "
+ "committed after multiple checkpoint delays instead of after each checkpoint completion.\n"
+ "This allows continuous recycle of log files in stream phase.");
} }

@ -28,6 +28,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSeriali
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext; import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit; import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent; import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitEvent; import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitEvent;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
@ -38,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List; import java.util.List;
import java.util.PriorityQueue;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
@ -54,6 +56,9 @@ public class PostgresSourceReader extends IncrementalSourceReaderWithCommit {
/** whether to commit offset. */ /** whether to commit offset. */
private volatile boolean isCommitOffset = false; private volatile boolean isCommitOffset = false;
private final PriorityQueue<Long> minHeap;
private final int lsnCommitCheckpointsDelay;
public PostgresSourceReader( public PostgresSourceReader(
FutureCompletingBlockingQueue elementQueue, FutureCompletingBlockingQueue elementQueue,
Supplier supplier, Supplier supplier,
@ -72,6 +77,9 @@ public class PostgresSourceReader extends IncrementalSourceReaderWithCommit {
sourceConfig, sourceConfig,
sourceSplitSerializer, sourceSplitSerializer,
dialect); dialect);
this.lsnCommitCheckpointsDelay =
((PostgresSourceConfig) sourceConfig).getLsnCommitCheckpointsDelay();
this.minHeap = new PriorityQueue<>();
} }
@Override @Override
@ -104,12 +112,23 @@ public class PostgresSourceReader extends IncrementalSourceReaderWithCommit {
@Override @Override
public void notifyCheckpointComplete(long checkpointId) throws Exception { public void notifyCheckpointComplete(long checkpointId) throws Exception {
this.minHeap.add(checkpointId);
if (this.minHeap.size() <= this.lsnCommitCheckpointsDelay) {
LOG.info("Pending checkpoints '{}'.", this.minHeap);
return;
}
final long checkpointIdToCommit = this.minHeap.poll();
LOG.info(
"Pending checkpoints '{}', to be committed checkpoint id '{}'.",
this.minHeap,
checkpointIdToCommit);
// After all snapshot splits are finished, update stream split's metadata and reset start // After all snapshot splits are finished, update stream split's metadata and reset start
// offset, which maybe smaller than before. // offset, which maybe smaller than before.
// In case that new start-offset of stream split has been recycled, don't commit offset // In case that new start-offset of stream split has been recycled, don't commit offset
// during new table added phase. // during new table added phase.
if (isCommitOffset()) { if (isCommitOffset()) {
super.notifyCheckpointComplete(checkpointId); super.notifyCheckpointComplete(checkpointIdToCommit);
} }
} }

@ -57,6 +57,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSou
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
@ -114,6 +115,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
if (enableParallelRead) { if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
@ -158,7 +160,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
chunkKeyColumn, chunkKeyColumn,
closeIdlerReaders, closeIdlerReaders,
skipSnapshotBackfill, skipSnapshotBackfill,
isScanNewlyAddedTableEnabled); isScanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay);
} }
@Override @Override
@ -200,6 +203,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
return options; return options;
} }

@ -82,10 +82,9 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
private final StartupOptions startupOptions; private final StartupOptions startupOptions;
private final String chunkKeyColumn; private final String chunkKeyColumn;
private final boolean closeIdleReaders; private final boolean closeIdleReaders;
private final boolean skipSnapshotBackfill; private final boolean skipSnapshotBackfill;
private final boolean scanNewlyAddedTableEnabled; private final boolean scanNewlyAddedTableEnabled;
private final int lsnCommitCheckpointsDelay;
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Mutable attributes // Mutable attributes
@ -124,7 +123,8 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
@Nullable String chunkKeyColumn, @Nullable String chunkKeyColumn,
boolean closeIdleReaders, boolean closeIdleReaders,
boolean skipSnapshotBackfill, boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled) { boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay) {
this.physicalSchema = physicalSchema; this.physicalSchema = physicalSchema;
this.port = port; this.port = port;
this.hostname = checkNotNull(hostname); this.hostname = checkNotNull(hostname);
@ -155,6 +155,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
this.closeIdleReaders = closeIdleReaders; this.closeIdleReaders = closeIdleReaders;
this.skipSnapshotBackfill = skipSnapshotBackfill; this.skipSnapshotBackfill = skipSnapshotBackfill;
this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled; this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
} }
@Override @Override
@ -216,6 +217,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
.closeIdleReaders(closeIdleReaders) .closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill) .skipSnapshotBackfill(skipSnapshotBackfill)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.build(); .build();
return SourceProvider.of(parallelSource); return SourceProvider.of(parallelSource);
} else { } else {
@ -283,7 +285,8 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
chunkKeyColumn, chunkKeyColumn,
closeIdleReaders, closeIdleReaders,
skipSnapshotBackfill, skipSnapshotBackfill,
scanNewlyAddedTableEnabled); scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay);
source.metadataKeys = metadataKeys; source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType; source.producedDataType = producedDataType;
return source; return source;

@ -220,6 +220,7 @@ public abstract class PostgresTestBase extends AbstractTestBase {
postgresSourceConfigFactory.tableList(schemaName + "." + tableName); postgresSourceConfigFactory.tableList(schemaName + "." + tableName);
postgresSourceConfigFactory.splitSize(splitSize); postgresSourceConfigFactory.splitSize(splitSize);
postgresSourceConfigFactory.skipSnapshotBackfill(skipSnapshotBackfill); postgresSourceConfigFactory.skipSnapshotBackfill(skipSnapshotBackfill);
postgresSourceConfigFactory.setLsnCommitCheckpointsDelay(1);
return postgresSourceConfigFactory; return postgresSourceConfigFactory;
} }

@ -707,6 +707,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
.tableList(tableId) .tableList(tableId)
.startupOptions(startupOptions) .startupOptions(startupOptions)
.skipSnapshotBackfill(skipSnapshotBackfill) .skipSnapshotBackfill(skipSnapshotBackfill)
.lsnCommitCheckpointsDelay(1)
.deserializer(customerTable.getDeserializer()) .deserializer(customerTable.getDeserializer())
.build(); .build();
@ -816,7 +817,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
+ " 'scan.startup.mode' = '%s'," + " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100'," + " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s'," + " 'slot.name' = '%s',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'" + " 'scan.incremental.snapshot.backfill.skip' = '%s',"
+ " 'scan.lsn-commit.checkpoints-num-delay' = '1'"
+ "" + ""
+ ")", + ")",
customDatabase.getHost(), customDatabase.getHost(),

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.postgres.source.reader;
import org.apache.flink.cdc.connectors.postgres.source.MockPostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import org.apache.flink.cdc.connectors.postgres.testutils.TestTable;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link PostgresSourceReader}. */
class PostgresSourceReaderTest {
@Test
void testNotifyCheckpointWindowSizeOne() throws Exception {
final PostgresSourceReader reader = createReader(1);
final List<Long> completedCheckpointIds = new ArrayList<>();
MockPostgresDialect.setNotifyCheckpointCompleteCallback(
id -> completedCheckpointIds.add(id));
reader.notifyCheckpointComplete(11L);
assertThat(completedCheckpointIds).isEmpty();
reader.notifyCheckpointComplete(12L);
assertThat(completedCheckpointIds).containsExactly(11L);
reader.notifyCheckpointComplete(13L);
assertThat(completedCheckpointIds).containsExactly(11L, 12L);
}
@Test
void testNotifyCheckpointWindowSizeDefault() throws Exception {
final PostgresSourceReader reader = createReader(3);
final List<Long> completedCheckpointIds = new ArrayList<>();
MockPostgresDialect.setNotifyCheckpointCompleteCallback(
id -> completedCheckpointIds.add(id));
reader.notifyCheckpointComplete(103L);
assertThat(completedCheckpointIds).isEmpty();
reader.notifyCheckpointComplete(102L);
assertThat(completedCheckpointIds).isEmpty();
reader.notifyCheckpointComplete(101L);
assertThat(completedCheckpointIds).isEmpty();
reader.notifyCheckpointComplete(104L);
assertThat(completedCheckpointIds).containsExactly(101L);
}
private PostgresSourceReader createReader(final int lsnCommitCheckpointsDelay)
throws Exception {
final PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
final PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
configFactory.hostname("host");
configFactory.database("pgdb");
configFactory.username("username");
configFactory.password("password");
configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
final TestTable customerTable =
new TestTable(
"pgdb",
"customer",
"customers",
ResolvedSchema.of(Column.physical("id", BIGINT())));
final DebeziumDeserializationSchema<?> deserializer = customerTable.getDeserializer();
MockPostgresDialect dialect = new MockPostgresDialect(configFactory.create(0));
final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
new PostgresSourceBuilder.PostgresIncrementalSource<>(
configFactory, checkNotNull(deserializer), offsetFactory, dialect);
return source.createReader(new TestingReaderContext());
}
}

@ -64,7 +64,8 @@ public class MockPostgreSQLTableSource extends PostgreSQLTableSource {
(String) get(postgreSQLTableSource, "chunkKeyColumn"), (String) get(postgreSQLTableSource, "chunkKeyColumn"),
(boolean) get(postgreSQLTableSource, "closeIdleReaders"), (boolean) get(postgreSQLTableSource, "closeIdleReaders"),
(boolean) get(postgreSQLTableSource, "skipSnapshotBackfill"), (boolean) get(postgreSQLTableSource, "skipSnapshotBackfill"),
(boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled")); (boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled"),
(int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"));
} }
@Override @Override

@ -59,6 +59,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSou
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@ -151,7 +152,8 @@ public class PostgreSQLTableFactoryTest {
null, null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT, SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue()); SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
assertEquals(expectedSource, actualSource); assertEquals(expectedSource, actualSource);
} }
@ -196,7 +198,8 @@ public class PostgreSQLTableFactoryTest {
null, null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT, SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
true, true,
true); true,
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
assertEquals(expectedSource, actualSource); assertEquals(expectedSource, actualSource);
} }
@ -239,7 +242,8 @@ public class PostgreSQLTableFactoryTest {
null, null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT, SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue()); SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "schema_name", "table_name"); Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
@ -292,7 +296,8 @@ public class PostgreSQLTableFactoryTest {
null, null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT, SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue()); SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
assertEquals(expectedSource, actualSource); assertEquals(expectedSource, actualSource);
} }
@ -335,7 +340,8 @@ public class PostgreSQLTableFactoryTest {
null, null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT, SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue()); SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
assertEquals(expectedSource, actualSource); assertEquals(expectedSource, actualSource);
} }

Loading…
Cancel
Save