[cdc-connector][postgres] Fix data lost problem when new lsn committed to slot between snapshotState and notifyCheckpointComplete (#2539)

This closes #2538.
Co-authored-by: sammieliu <sammieliu@tencent.com>
pull/3021/head
lzshlzsh 1 year ago committed by GitHub
parent e3d6c7e0aa
commit 9ce36569fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -79,4 +79,12 @@ public interface DataSourceDialect<C extends SourceConfig>
*/
@Override
default void notifyCheckpointComplete(long checkpointId) throws Exception {}
/**
* We may need the offset corresponding to the checkpointId. For example, we should commit LSN
* of checkpoint to postgres's slot.
*/
default void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception {
notifyCheckpointComplete(checkpointId);
}
}

@ -78,7 +78,7 @@ public class IncrementalSource<T, C extends SourceConfig>
// This field is introduced for testing purpose, for example testing if changes made in the
// snapshot phase are correctly backfilled into the snapshot by registering a pre high watermark
// hook for generating changes.
private SnapshotPhaseHooks snapshotHooks = SnapshotPhaseHooks.empty();
protected SnapshotPhaseHooks snapshotHooks = SnapshotPhaseHooks.empty();
public IncrementalSource(
SourceConfig.Factory<C> configFactory,

@ -77,7 +77,7 @@ public class IncrementalSourceReader<T, C extends SourceConfig>
private final int subtaskId;
private final SourceSplitSerializer sourceSplitSerializer;
private final C sourceConfig;
private final DataSourceDialect<C> dialect;
protected final DataSourceDialect<C> dialect;
public IncrementalSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue,

@ -0,0 +1,100 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.base.source.reader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
/**
* Record the LSN of checkpoint {@link StreamSplit}, which can be used to submit to the CDC source.
*/
public class IncrementalSourceReaderWithCommit extends IncrementalSourceReader {
private static final Logger LOG =
LoggerFactory.getLogger(IncrementalSourceReaderWithCommit.class);
private final TreeMap<Long, Offset> lastCheckPointOffset;
private long maxCompletedCheckpointId;
public IncrementalSourceReaderWithCommit(
FutureCompletingBlockingQueue elementQueue,
Supplier supplier,
RecordEmitter recordEmitter,
Configuration config,
SourceReaderContext context,
SourceConfig sourceConfig,
SourceSplitSerializer sourceSplitSerializer,
DataSourceDialect dialect) {
super(
elementQueue,
supplier,
recordEmitter,
config,
context,
sourceConfig,
sourceSplitSerializer,
dialect);
this.lastCheckPointOffset = new TreeMap<>();
this.maxCompletedCheckpointId = 0;
}
@Override
public List<SourceSplitBase> snapshotState(long checkpointId) {
final List<SourceSplitBase> stateSplits = super.snapshotState(checkpointId);
stateSplits.stream()
.filter(SourceSplitBase::isStreamSplit)
.findAny()
.map(SourceSplitBase::asStreamSplit)
.ifPresent(
streamSplit -> {
lastCheckPointOffset.put(checkpointId, streamSplit.getStartingOffset());
LOG.debug(
"Starting offset of stream split is: {}, and checkpoint id is {}.",
streamSplit.getStartingOffset(),
checkpointId);
});
return stateSplits;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// checkpointId might be for a checkpoint that was triggered earlier. see
// CheckpointListener#notifyCheckpointComplete(long).
if (checkpointId > maxCompletedCheckpointId) {
Offset offset = lastCheckPointOffset.get(checkpointId);
dialect.notifyCheckpointComplete(checkpointId, offset);
lastCheckPointOffset.headMap(checkpointId, true).clear();
maxCompletedCheckpointId = checkpointId;
}
}
}

@ -214,9 +214,9 @@ public class PostgresDialect implements JdbcDataSourceDialect {
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception {
if (streamFetchTask != null) {
streamFetchTask.commitCurrentOffset();
streamFetchTask.commitCurrentOffset(offset);
}
}

@ -16,18 +16,27 @@
package com.ververica.cdc.connectors.postgres.source;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.common.annotation.Experimental;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner;
import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner;
import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner;
import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import com.ververica.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator;
@ -38,6 +47,7 @@ import io.debezium.relational.TableId;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -285,6 +295,36 @@ public class PostgresSourceBuilder<T> {
super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect);
}
@Override
public IncrementalSourceReader<T, JdbcSourceConfig> createReader(
SourceReaderContext readerContext) throws Exception {
// create source config for the given subtask (e.g. unique server id)
JdbcSourceConfig sourceConfig = configFactory.create(readerContext.getIndexOfSubtask());
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new FutureCompletingBlockingQueue<>();
final SourceReaderMetrics sourceReaderMetrics =
new SourceReaderMetrics(readerContext.metricGroup());
sourceReaderMetrics.registerMetrics();
Supplier<IncrementalSourceSplitReader<JdbcSourceConfig>> splitReaderSupplier =
() ->
new IncrementalSourceSplitReader<>(
readerContext.getIndexOfSubtask(),
dataSourceDialect,
sourceConfig,
snapshotHooks);
return new IncrementalSourceReaderWithCommit(
elementsQueue,
splitReaderSupplier,
createRecordEmitter(sourceConfig, sourceReaderMetrics),
readerContext.getConfiguration(),
readerContext,
sourceConfig,
sourceSplitSerializer,
dataSourceDialect);
}
@Override
public SplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(
SplitEnumeratorContext<SourceSplitBase> enumContext) {

@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.postgres.source.fetch;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
@ -42,6 +43,8 @@ import io.debezium.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
@ -119,7 +122,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
return split;
}
public void commitCurrentOffset() {
public void commitCurrentOffset(@Nullable Offset offsetToCommit) {
if (streamSplitReadTask != null && streamSplitReadTask.offsetContext != null) {
PostgresOffsetContext postgresOffsetContext = streamSplitReadTask.offsetContext;
@ -129,6 +132,21 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
postgresOffsetContext
.getOffset()
.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
if (offsetToCommit != null) {
// We should commit the checkpoint's LSN instead of postgresOffsetContext's LSN to
// the slot.
// If the checkpoint succeeds and a table UPDATE message arrives before the
// notifyCheckpoint is called, which is represented as a BEGIN/UPDATE/COMMIT WAL
// event sequence. The LSN of postgresOffsetContext will be updated to the LSN of
// the COMMIT event. Committing the COMMIT LSN to the slot is incorrect because if a
// failover occurs after the successful commission, Flink will recover from that
// checkpoint and consume WAL starting from the slot LSN that is the LSN of COMMIT
// event, rather than from the checkpoint's LSN. Therefore, UPDATE messages cannot
// be consumed, resulting in data loss.
commitLsn = ((PostgresOffset) offsetToCommit).getLsn().asLong();
}
if (commitLsn != null
&& (lastCommitLsn == null
|| Lsn.valueOf(commitLsn).compareTo(Lsn.valueOf(lastCommitLsn)) > 0)) {

@ -0,0 +1,46 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.source;
import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import java.util.function.Consumer;
/** Mock postgres dialect used to test changelog when checkpoint. */
public class MockPostgresDialect extends PostgresDialect {
private static Consumer<Long> callback = null;
public MockPostgresDialect(PostgresSourceConfig sourceConfig) {
super(sourceConfig);
}
@Override
public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception {
if (callback != null) {
callback.accept(checkpointId);
}
super.notifyCheckpointComplete(checkpointId, offset);
}
public static void setNotifyCheckpointCompleteCallback(Consumer<Long> callback) {
MockPostgresDialect.callback = Preconditions.checkNotNull(callback);
}
}

@ -33,6 +33,7 @@ import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
@ -60,12 +61,14 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -552,6 +555,72 @@ public class PostgresSourceITCase extends PostgresTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testNewLsnCommittedWhenCheckpoint() throws Exception {
int parallelism = 1;
FailoverType failoverType = FailoverType.JM;
FailoverPhase failoverPhase = FailoverPhase.STREAM;
String[] captureCustomerTables = new String[] {"customers"};
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
RestartStrategies.fixedDelayRestart(1, 0);
boolean skipSnapshotBackfill = false;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(restartStrategyConfiguration);
String sourceDDL =
format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc-mock',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ")",
customDatabase.getHost(),
customDatabase.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
SCHEMA_NAME,
getTableNameRegex(captureCustomerTables),
scanStartupMode,
slotName,
skipSnapshotBackfill);
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
// first step: check the snapshot data
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables);
}
// second step: check the stream data
checkStreamDataWithHook(tableResult, failoverType, failoverPhase, captureCustomerTables);
tableResult.getJobClient().get().cancel().get();
// sleep 1000ms to wait until connections are closed.
Thread.sleep(1000L);
}
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill,
int fetchSize,
@ -811,6 +880,75 @@ public class PostgresSourceITCase extends PostgresTestBase {
assertTrue(!hasNextData(iterator));
}
private void checkStreamDataWithHook(
TableResult tableResult,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
waitUntilJobRunning(tableResult);
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
final AtomicLong savedCheckpointId = new AtomicLong(0);
final CountDownLatch countDownLatch = new CountDownLatch(1);
MockPostgresDialect.setNotifyCheckpointCompleteCallback(
checkpointId -> {
try {
if (savedCheckpointId.get() == 0) {
savedCheckpointId.set(checkpointId);
for (String tableId : captureCustomerTables) {
makeFirstPartStreamEvents(
getConnection(),
customDatabase.getDatabaseName()
+ '.'
+ SCHEMA_NAME
+ '.'
+ tableId);
}
// wait for the stream reading
Thread.sleep(2000L);
triggerFailover(
failoverType,
jobId,
miniClusterResource.getMiniCluster(),
() -> sleepMs(200));
countDownLatch.countDown();
}
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
});
countDownLatch.await();
waitUntilJobRunning(tableResult);
if (failoverPhase == FailoverPhase.STREAM) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200));
waitUntilJobRunning(tableResult);
}
for (String tableId : captureCustomerTables) {
makeSecondPartStreamEvents(
getConnection(),
customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId);
}
List<String> expectedStreamData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedStreamData.addAll(firstPartStreamEvents);
expectedStreamData.addAll(secondPartStreamEvents);
}
// wait for the stream reading
Thread.sleep(2000L);
assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
assertTrue(!hasNextData(iterator));
}
private void checkStreamDataWithDDLDuringFailover(
TableResult tableResult,
FailoverType failoverType,

@ -0,0 +1,37 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.table;
import org.apache.flink.table.connector.source.DynamicTableSource;
/** Mock {@link PostgreSQLTableFactory}. */
public class MockPostgreSQLTableFactory extends PostgreSQLTableFactory {
public static final String IDENTIFIER = "postgres-cdc-mock";
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
PostgreSQLTableSource postgreSQLTableSource =
(PostgreSQLTableSource) super.createDynamicTableSource(context);
return new MockPostgreSQLTableSource(postgreSQLTableSource);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
}

@ -0,0 +1,111 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.table;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.IncrementalSource;
import com.ververica.cdc.connectors.postgres.source.MockPostgresDialect;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Properties;
/** Mock {@link PostgreSQLTableSource}. */
public class MockPostgreSQLTableSource extends PostgreSQLTableSource {
public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) {
super(
(ResolvedSchema) get(postgreSQLTableSource, "physicalSchema"),
(int) get(postgreSQLTableSource, "port"),
(String) get(postgreSQLTableSource, "hostname"),
(String) get(postgreSQLTableSource, "database"),
(String) get(postgreSQLTableSource, "schemaName"),
(String) get(postgreSQLTableSource, "tableName"),
(String) get(postgreSQLTableSource, "username"),
(String) get(postgreSQLTableSource, "password"),
(String) get(postgreSQLTableSource, "pluginName"),
(String) get(postgreSQLTableSource, "slotName"),
(DebeziumChangelogMode) get(postgreSQLTableSource, "changelogMode"),
(Properties) get(postgreSQLTableSource, "dbzProperties"),
(boolean) get(postgreSQLTableSource, "enableParallelRead"),
(int) get(postgreSQLTableSource, "splitSize"),
(int) get(postgreSQLTableSource, "splitMetaGroupSize"),
(int) get(postgreSQLTableSource, "fetchSize"),
(Duration) get(postgreSQLTableSource, "connectTimeout"),
(int) get(postgreSQLTableSource, "connectMaxRetries"),
(int) get(postgreSQLTableSource, "connectionPoolSize"),
(double) get(postgreSQLTableSource, "distributionFactorUpper"),
(double) get(postgreSQLTableSource, "distributionFactorLower"),
(Duration) get(postgreSQLTableSource, "heartbeatInterval"),
(StartupOptions) get(postgreSQLTableSource, "startupOptions"),
(String) get(postgreSQLTableSource, "chunkKeyColumn"),
(boolean) get(postgreSQLTableSource, "closeIdleReaders"),
(boolean) get(postgreSQLTableSource, "skipSnapshotBackfill"));
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
ScanRuntimeProvider scanRuntimeProvider = super.getScanRuntimeProvider(scanContext);
if (scanRuntimeProvider instanceof SourceProvider) {
Source<RowData, ?, ?> source = ((SourceProvider) scanRuntimeProvider).createSource();
Preconditions.checkState(
source instanceof PostgresSourceBuilder.PostgresIncrementalSource);
PostgresSourceBuilder.PostgresIncrementalSource incrementalSource =
(PostgresSourceBuilder.PostgresIncrementalSource) source;
try {
Field configFactoryField =
IncrementalSource.class.getDeclaredField("configFactory");
configFactoryField.setAccessible(true);
PostgresSourceConfigFactory configFactory =
(PostgresSourceConfigFactory) configFactoryField.get(incrementalSource);
MockPostgresDialect mockPostgresDialect =
new MockPostgresDialect(configFactory.create(0));
Field dataSourceDialectField =
IncrementalSource.class.getDeclaredField("dataSourceDialect");
dataSourceDialectField.setAccessible(true);
dataSourceDialectField.set(incrementalSource, mockPostgresDialect);
} catch (NoSuchFieldException | IllegalArgumentException | IllegalAccessException e) {
throw new FlinkRuntimeException(e);
}
}
return scanRuntimeProvider;
}
private static Object get(PostgreSQLTableSource postgreSQLTableSource, String name) {
try {
Field field = postgreSQLTableSource.getClass().getDeclaredField(name);
field.setAccessible(true);
return field.get(postgreSQLTableSource);
} catch (NoSuchFieldException | IllegalArgumentException | IllegalAccessException e) {
throw new FlinkRuntimeException(e);
}
}
}

@ -0,0 +1,14 @@
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.ververica.cdc.connectors.postgres.table.MockPostgreSQLTableFactory
Loading…
Cancel
Save