diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java index c41eee8af..402ee609f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java @@ -79,4 +79,12 @@ public interface DataSourceDialect */ @Override default void notifyCheckpointComplete(long checkpointId) throws Exception {} + + /** + * We may need the offset corresponding to the checkpointId. For example, we should commit LSN + * of checkpoint to postgres's slot. + */ + default void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception { + notifyCheckpointComplete(checkpointId); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java index aeb847433..8b490e8d6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java @@ -78,7 +78,7 @@ public class IncrementalSource // This field is introduced for testing purpose, for example testing if changes made in the // snapshot phase are correctly backfilled into the snapshot by registering a pre high watermark // hook for generating changes. - private SnapshotPhaseHooks snapshotHooks = SnapshotPhaseHooks.empty(); + protected SnapshotPhaseHooks snapshotHooks = SnapshotPhaseHooks.empty(); public IncrementalSource( SourceConfig.Factory configFactory, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReader.java index e88086a86..c22a9e329 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReader.java @@ -77,7 +77,7 @@ public class IncrementalSourceReader private final int subtaskId; private final SourceSplitSerializer sourceSplitSerializer; private final C sourceConfig; - private final DataSourceDialect dialect; + protected final DataSourceDialect dialect; public IncrementalSourceReader( FutureCompletingBlockingQueue> elementQueue, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReaderWithCommit.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReaderWithCommit.java new file mode 100644 index 000000000..f538bbcaf --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReaderWithCommit.java @@ -0,0 +1,100 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.base.source.reader; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; + +import com.ververica.cdc.connectors.base.config.SourceConfig; +import com.ververica.cdc.connectors.base.dialect.DataSourceDialect; +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer; +import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.TreeMap; +import java.util.function.Supplier; + +/** + * Record the LSN of checkpoint {@link StreamSplit}, which can be used to submit to the CDC source. + */ +public class IncrementalSourceReaderWithCommit extends IncrementalSourceReader { + private static final Logger LOG = + LoggerFactory.getLogger(IncrementalSourceReaderWithCommit.class); + + private final TreeMap lastCheckPointOffset; + private long maxCompletedCheckpointId; + + public IncrementalSourceReaderWithCommit( + FutureCompletingBlockingQueue elementQueue, + Supplier supplier, + RecordEmitter recordEmitter, + Configuration config, + SourceReaderContext context, + SourceConfig sourceConfig, + SourceSplitSerializer sourceSplitSerializer, + DataSourceDialect dialect) { + super( + elementQueue, + supplier, + recordEmitter, + config, + context, + sourceConfig, + sourceSplitSerializer, + dialect); + this.lastCheckPointOffset = new TreeMap<>(); + this.maxCompletedCheckpointId = 0; + } + + @Override + public List snapshotState(long checkpointId) { + final List stateSplits = super.snapshotState(checkpointId); + + stateSplits.stream() + .filter(SourceSplitBase::isStreamSplit) + .findAny() + .map(SourceSplitBase::asStreamSplit) + .ifPresent( + streamSplit -> { + lastCheckPointOffset.put(checkpointId, streamSplit.getStartingOffset()); + LOG.debug( + "Starting offset of stream split is: {}, and checkpoint id is {}.", + streamSplit.getStartingOffset(), + checkpointId); + }); + + return stateSplits; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + // checkpointId might be for a checkpoint that was triggered earlier. see + // CheckpointListener#notifyCheckpointComplete(long). + if (checkpointId > maxCompletedCheckpointId) { + Offset offset = lastCheckPointOffset.get(checkpointId); + dialect.notifyCheckpointComplete(checkpointId, offset); + lastCheckPointOffset.headMap(checkpointId, true).clear(); + maxCompletedCheckpointId = checkpointId; + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java index 325dc9cf7..806745e0b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java @@ -214,9 +214,9 @@ public class PostgresDialect implements JdbcDataSourceDialect { } @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { + public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception { if (streamFetchTask != null) { - streamFetchTask.commitCurrentOffset(); + streamFetchTask.commitCurrentOffset(offset); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java index 42f4f7e0b..9a054d49c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -16,18 +16,27 @@ package com.ververica.cdc.connectors.postgres.source; +import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.util.FlinkRuntimeException; import com.ververica.cdc.common.annotation.Experimental; +import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner; import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner; import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner; import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState; import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader; +import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit; +import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader; import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig; import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; import com.ververica.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator; @@ -38,6 +47,7 @@ import io.debezium.relational.TableId; import java.time.Duration; import java.util.List; import java.util.Properties; +import java.util.function.Supplier; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -285,6 +295,36 @@ public class PostgresSourceBuilder { super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect); } + @Override + public IncrementalSourceReader createReader( + SourceReaderContext readerContext) throws Exception { + // create source config for the given subtask (e.g. unique server id) + JdbcSourceConfig sourceConfig = configFactory.create(readerContext.getIndexOfSubtask()); + FutureCompletingBlockingQueue> elementsQueue = + new FutureCompletingBlockingQueue<>(); + + final SourceReaderMetrics sourceReaderMetrics = + new SourceReaderMetrics(readerContext.metricGroup()); + + sourceReaderMetrics.registerMetrics(); + Supplier> splitReaderSupplier = + () -> + new IncrementalSourceSplitReader<>( + readerContext.getIndexOfSubtask(), + dataSourceDialect, + sourceConfig, + snapshotHooks); + return new IncrementalSourceReaderWithCommit( + elementsQueue, + splitReaderSupplier, + createRecordEmitter(sourceConfig, sourceReaderMetrics), + readerContext.getConfiguration(), + readerContext, + sourceConfig, + sourceSplitSerializer, + dataSourceDialect); + } + @Override public SplitEnumerator createEnumerator( SplitEnumeratorContext enumContext) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java index da1b3e3c1..30cc32bb2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java @@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.postgres.source.fetch; import org.apache.flink.util.FlinkRuntimeException; import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher; +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind; @@ -42,6 +43,8 @@ import io.debezium.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.HashMap; import java.util.Map; @@ -119,7 +122,7 @@ public class PostgresStreamFetchTask implements FetchTask { return split; } - public void commitCurrentOffset() { + public void commitCurrentOffset(@Nullable Offset offsetToCommit) { if (streamSplitReadTask != null && streamSplitReadTask.offsetContext != null) { PostgresOffsetContext postgresOffsetContext = streamSplitReadTask.offsetContext; @@ -129,6 +132,21 @@ public class PostgresStreamFetchTask implements FetchTask { postgresOffsetContext .getOffset() .get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); + + if (offsetToCommit != null) { + // We should commit the checkpoint's LSN instead of postgresOffsetContext's LSN to + // the slot. + // If the checkpoint succeeds and a table UPDATE message arrives before the + // notifyCheckpoint is called, which is represented as a BEGIN/UPDATE/COMMIT WAL + // event sequence. The LSN of postgresOffsetContext will be updated to the LSN of + // the COMMIT event. Committing the COMMIT LSN to the slot is incorrect because if a + // failover occurs after the successful commission, Flink will recover from that + // checkpoint and consume WAL starting from the slot LSN that is the LSN of COMMIT + // event, rather than from the checkpoint's LSN. Therefore, UPDATE messages cannot + // be consumed, resulting in data loss. + commitLsn = ((PostgresOffset) offsetToCommit).getLsn().asLong(); + } + if (commitLsn != null && (lastCommitLsn == null || Lsn.valueOf(commitLsn).compareTo(Lsn.valueOf(lastCommitLsn)) > 0)) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/MockPostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/MockPostgresDialect.java new file mode 100644 index 000000000..9a5bb17fe --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/MockPostgresDialect.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.postgres.source; + +import org.apache.flink.util.Preconditions; + +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; +import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig; + +import java.util.function.Consumer; + +/** Mock postgres dialect used to test changelog when checkpoint. */ +public class MockPostgresDialect extends PostgresDialect { + + private static Consumer callback = null; + + public MockPostgresDialect(PostgresSourceConfig sourceConfig) { + super(sourceConfig); + } + + @Override + public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception { + if (callback != null) { + callback.accept(checkpointId); + } + super.notifyCheckpointComplete(checkpointId, offset); + } + + public static void setNotifyCheckpointCompleteCallback(Consumer callback) { + MockPostgresDialect.callback = Preconditions.checkNotNull(callback); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java index c51434ee3..f08d30326 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook; @@ -60,12 +61,14 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -552,6 +555,72 @@ public class PostgresSourceITCase extends PostgresTestBase { assertEqualsInAnyOrder(expectedRecords, records); } + @Test + public void testNewLsnCommittedWhenCheckpoint() throws Exception { + int parallelism = 1; + FailoverType failoverType = FailoverType.JM; + FailoverPhase failoverPhase = FailoverPhase.STREAM; + String[] captureCustomerTables = new String[] {"customers"}; + RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = + RestartStrategies.fixedDelayRestart(1, 0); + boolean skipSnapshotBackfill = false; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(restartStrategyConfiguration); + String sourceDDL = + format( + "CREATE TABLE customers (" + + " id BIGINT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " primary key (id) not enforced" + + ") WITH (" + + " 'connector' = 'postgres-cdc-mock'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.startup.mode' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'slot.name' = '%s'," + + " 'scan.incremental.snapshot.backfill.skip' = '%s'" + + ")", + customDatabase.getHost(), + customDatabase.getDatabasePort(), + customDatabase.getUsername(), + customDatabase.getPassword(), + customDatabase.getDatabaseName(), + SCHEMA_NAME, + getTableNameRegex(captureCustomerTables), + scanStartupMode, + slotName, + skipSnapshotBackfill); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from customers"); + + // first step: check the snapshot data + if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) { + checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables); + } + + // second step: check the stream data + checkStreamDataWithHook(tableResult, failoverType, failoverPhase, captureCustomerTables); + + tableResult.getJobClient().get().cancel().get(); + + // sleep 1000ms to wait until connections are closed. + Thread.sleep(1000L); + } + private List testBackfillWhenWritingEvents( boolean skipSnapshotBackfill, int fetchSize, @@ -811,6 +880,75 @@ public class PostgresSourceITCase extends PostgresTestBase { assertTrue(!hasNextData(iterator)); } + private void checkStreamDataWithHook( + TableResult tableResult, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables) + throws Exception { + waitUntilJobRunning(tableResult); + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + + final AtomicLong savedCheckpointId = new AtomicLong(0); + final CountDownLatch countDownLatch = new CountDownLatch(1); + + MockPostgresDialect.setNotifyCheckpointCompleteCallback( + checkpointId -> { + try { + if (savedCheckpointId.get() == 0) { + savedCheckpointId.set(checkpointId); + + for (String tableId : captureCustomerTables) { + makeFirstPartStreamEvents( + getConnection(), + customDatabase.getDatabaseName() + + '.' + + SCHEMA_NAME + + '.' + + tableId); + } + // wait for the stream reading + Thread.sleep(2000L); + + triggerFailover( + failoverType, + jobId, + miniClusterResource.getMiniCluster(), + () -> sleepMs(200)); + countDownLatch.countDown(); + } + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + }); + + countDownLatch.await(); + waitUntilJobRunning(tableResult); + + if (failoverPhase == FailoverPhase.STREAM) { + triggerFailover( + failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200)); + waitUntilJobRunning(tableResult); + } + for (String tableId : captureCustomerTables) { + makeSecondPartStreamEvents( + getConnection(), + customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId); + } + + List expectedStreamData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedStreamData.addAll(firstPartStreamEvents); + expectedStreamData.addAll(secondPartStreamEvents); + } + // wait for the stream reading + Thread.sleep(2000L); + + assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size())); + assertTrue(!hasNextData(iterator)); + } + private void checkStreamDataWithDDLDuringFailover( TableResult tableResult, FailoverType failoverType, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/MockPostgreSQLTableFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/MockPostgreSQLTableFactory.java new file mode 100644 index 000000000..fe6859436 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/MockPostgreSQLTableFactory.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.postgres.table; + +import org.apache.flink.table.connector.source.DynamicTableSource; + +/** Mock {@link PostgreSQLTableFactory}. */ +public class MockPostgreSQLTableFactory extends PostgreSQLTableFactory { + public static final String IDENTIFIER = "postgres-cdc-mock"; + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + PostgreSQLTableSource postgreSQLTableSource = + (PostgreSQLTableSource) super.createDynamicTableSource(context); + + return new MockPostgreSQLTableSource(postgreSQLTableSource); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java new file mode 100644 index 000000000..495533708 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java @@ -0,0 +1,111 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.postgres.table; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.ververica.cdc.connectors.base.options.StartupOptions; +import com.ververica.cdc.connectors.base.source.IncrementalSource; +import com.ververica.cdc.connectors.postgres.source.MockPostgresDialect; +import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder; +import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; +import com.ververica.cdc.debezium.table.DebeziumChangelogMode; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Properties; + +/** Mock {@link PostgreSQLTableSource}. */ +public class MockPostgreSQLTableSource extends PostgreSQLTableSource { + public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) { + super( + (ResolvedSchema) get(postgreSQLTableSource, "physicalSchema"), + (int) get(postgreSQLTableSource, "port"), + (String) get(postgreSQLTableSource, "hostname"), + (String) get(postgreSQLTableSource, "database"), + (String) get(postgreSQLTableSource, "schemaName"), + (String) get(postgreSQLTableSource, "tableName"), + (String) get(postgreSQLTableSource, "username"), + (String) get(postgreSQLTableSource, "password"), + (String) get(postgreSQLTableSource, "pluginName"), + (String) get(postgreSQLTableSource, "slotName"), + (DebeziumChangelogMode) get(postgreSQLTableSource, "changelogMode"), + (Properties) get(postgreSQLTableSource, "dbzProperties"), + (boolean) get(postgreSQLTableSource, "enableParallelRead"), + (int) get(postgreSQLTableSource, "splitSize"), + (int) get(postgreSQLTableSource, "splitMetaGroupSize"), + (int) get(postgreSQLTableSource, "fetchSize"), + (Duration) get(postgreSQLTableSource, "connectTimeout"), + (int) get(postgreSQLTableSource, "connectMaxRetries"), + (int) get(postgreSQLTableSource, "connectionPoolSize"), + (double) get(postgreSQLTableSource, "distributionFactorUpper"), + (double) get(postgreSQLTableSource, "distributionFactorLower"), + (Duration) get(postgreSQLTableSource, "heartbeatInterval"), + (StartupOptions) get(postgreSQLTableSource, "startupOptions"), + (String) get(postgreSQLTableSource, "chunkKeyColumn"), + (boolean) get(postgreSQLTableSource, "closeIdleReaders"), + (boolean) get(postgreSQLTableSource, "skipSnapshotBackfill")); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + ScanRuntimeProvider scanRuntimeProvider = super.getScanRuntimeProvider(scanContext); + + if (scanRuntimeProvider instanceof SourceProvider) { + Source source = ((SourceProvider) scanRuntimeProvider).createSource(); + Preconditions.checkState( + source instanceof PostgresSourceBuilder.PostgresIncrementalSource); + + PostgresSourceBuilder.PostgresIncrementalSource incrementalSource = + (PostgresSourceBuilder.PostgresIncrementalSource) source; + + try { + Field configFactoryField = + IncrementalSource.class.getDeclaredField("configFactory"); + configFactoryField.setAccessible(true); + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) configFactoryField.get(incrementalSource); + MockPostgresDialect mockPostgresDialect = + new MockPostgresDialect(configFactory.create(0)); + + Field dataSourceDialectField = + IncrementalSource.class.getDeclaredField("dataSourceDialect"); + dataSourceDialectField.setAccessible(true); + dataSourceDialectField.set(incrementalSource, mockPostgresDialect); + } catch (NoSuchFieldException | IllegalArgumentException | IllegalAccessException e) { + throw new FlinkRuntimeException(e); + } + } + + return scanRuntimeProvider; + } + + private static Object get(PostgreSQLTableSource postgreSQLTableSource, String name) { + try { + Field field = postgreSQLTableSource.getClass().getDeclaredField(name); + field.setAccessible(true); + return field.get(postgreSQLTableSource); + } catch (NoSuchFieldException | IllegalArgumentException | IllegalAccessException e) { + throw new FlinkRuntimeException(e); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..51f1231e3 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,14 @@ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.ververica.cdc.connectors.postgres.table.MockPostgreSQLTableFactory