[cdc-connector][Postgres] Add SNAPSHOT_ONLY mode for Postgres CDC Source

pull/2842/head
Hongshun Wang 1 year ago committed by Leonard Xu
parent 1cbdb4f8ae
commit 0e4f835f6d

@ -142,7 +142,7 @@ public class IncrementalSourceReader<T, C extends SourceConfig>
for (SourceSplitState splitState : finishedSplitIds.values()) {
SourceSplitBase sourceSplit = splitState.toSourceSplit();
if (sourceConfig.getStartupOptions().isSnapshotOnly() && sourceSplit.isStreamSplit()) {
// when startupMode = SNAPSHOT_ONLY. the stream split could finish.
// when startupMode = SNAPSHOT. the stream split could finish.
continue;
}
checkState(

@ -21,7 +21,6 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.common.annotation.Experimental;
import com.ververica.cdc.connectors.base.options.StartupMode;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner;
import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner;
@ -291,7 +290,7 @@ public class PostgresSourceBuilder<T> {
SplitEnumeratorContext<SourceSplitBase> enumContext) {
final SplitAssigner splitAssigner;
PostgresSourceConfig sourceConfig = (PostgresSourceConfig) configFactory.create(0);
if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {
if (!sourceConfig.getStartupOptions().isStreamOnly()) {
try {
final List<TableId> remainingTables =
dataSourceDialect.discoverDataCollections(sourceConfig);

@ -20,6 +20,7 @@ import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner;
import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
@ -33,6 +34,7 @@ import io.debezium.connector.postgresql.spi.SlotState;
* The Postgres source enumerator that enumerates receive the split request and assign the split to
* source readers.
*/
@Internal
public class PostgresSourceEnumerator extends IncrementalSourceEnumerator {
private final PostgresDialect postgresDialect;

@ -36,7 +36,6 @@ import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
@ -104,8 +103,8 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
ctx.getSnapshotChangeEventSourceMetrics(),
snapshotSplit);
PostgresChangeEventSourceContext changeEventSourceContext =
new PostgresChangeEventSourceContext();
StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();
SnapshotResult<PostgresOffsetContext> snapshotResult =
snapshotSplitReadTask.execute(
changeEventSourceContext, ctx.getPartition(), ctx.getOffsetContext());
@ -146,7 +145,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
snapshotSplit,
((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask());
backfillReadTask.execute(
new PostgresChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext);
new StoppableChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext);
}
/**
@ -205,18 +204,6 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
}
}
class PostgresChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext {
public void finished() {
taskRunning = false;
}
@Override
public boolean isRunning() {
return taskRunning;
}
}
/** A SnapshotChangeEventSource implementation for Postgres to read snapshot split. */
public static class PostgresSnapshotSplitReadTask
extends AbstractSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {

@ -90,8 +90,8 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
sourceFetchContext.getTaskContext(),
sourceFetchContext.getReplicationConnection(),
split);
StreamSplitChangeEventSourceContext changeEventSourceContext =
new StreamSplitChangeEventSourceContext();
StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();
streamSplitReadTask.execute(
changeEventSourceContext,
sourceFetchContext.getPartition(),
@ -102,7 +102,8 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
public void close() {
LOG.debug("stopping StreamFetchTask for split: {}", split);
if (streamSplitReadTask != null) {
((StreamSplitChangeEventSourceContext) streamSplitReadTask.context).finished();
((StoppableChangeEventSourceContext) (streamSplitReadTask.context))
.stopChangeEventSource();
}
stopped = true;
taskRunning = false;
@ -144,19 +145,6 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
}
}
private class StreamSplitChangeEventSourceContext
implements ChangeEventSource.ChangeEventSourceContext {
public void finished() {
taskRunning = false;
}
@Override
public boolean isRunning() {
return taskRunning;
}
}
/** A {@link ChangeEventSource} implementation for Postgres to read streaming changes. */
public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource {
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
@ -228,7 +216,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
new FlinkRuntimeException("Error processing WAL signal event", e));
}
((PostgresScanFetchTask.PostgresChangeEventSourceContext) context).finished();
((StoppableChangeEventSourceContext) context).stopChangeEventSource();
}
}

@ -0,0 +1,38 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.source.fetch;
import io.debezium.pipeline.source.spi.ChangeEventSource;
/**
* A change event source context that can stop the running source by invoking {@link
* #stopChangeEventSource()}.
*/
public class StoppableChangeEventSourceContext
implements ChangeEventSource.ChangeEventSourceContext {
private volatile boolean isRunning = true;
public void stopChangeEventSource() {
isRunning = false;
}
@Override
public boolean isRunning() {
return isRunning;
}
}

@ -200,6 +200,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
}
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static StartupOptions getStartupOptions(ReadableConfig config) {
@ -208,16 +210,18 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
switch (modeString.toLowerCase()) {
case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial();
case SCAN_STARTUP_MODE_VALUE_SNAPSHOT:
return StartupOptions.snapshot();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
default:
throw new ValidationException(
String.format(
"Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
"Invalid value for option '%s'. Supported values are [%s, %s, %s], but was: %s",
SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_SNAPSHOT,
SCAN_STARTUP_MODE_VALUE_LATEST,
modeString));
}

@ -34,6 +34,7 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.postgres.PostgresTestBase;
@ -89,6 +90,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
private final String scanStartupMode;
@ -323,13 +325,81 @@ public class PostgresSourceITCase extends PostgresTestBase {
tableResult.getJobClient().get().cancel().get();
}
@Test
public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
// The data num is 21, set fetchSize = 22 to test the job is bounded.
List<String> records =
testBackfillWhenWritingEvents(
false, 22, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.snapshot());
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]");
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
// The data num is 21, set fetchSize = 22 to test the job is bounded
List<String> records =
testBackfillWhenWritingEvents(
false, 22, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.snapshot());
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Pittsburgh, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]");
// when enable backfill, the wal log between (snapshot, high_watermark) will be
// applied as snapshot image
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
if (!DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
return;
}
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);
List<String> records =
testBackfillWhenWritingEvents(
false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial());
List<String> expectedRecords =
Arrays.asList(
@ -365,7 +435,9 @@ public class PostgresSourceITCase extends PostgresTestBase {
return;
}
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);
List<String> records =
testBackfillWhenWritingEvents(
false, 21, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial());
List<String> expectedRecords =
Arrays.asList(
@ -401,7 +473,9 @@ public class PostgresSourceITCase extends PostgresTestBase {
return;
}
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK);
List<String> records =
testBackfillWhenWritingEvents(
true, 25, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial());
List<String> expectedRecords =
Arrays.asList(
@ -441,7 +515,9 @@ public class PostgresSourceITCase extends PostgresTestBase {
return;
}
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK);
List<String> records =
testBackfillWhenWritingEvents(
true, 25, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial());
List<String> expectedRecords =
Arrays.asList(
@ -477,7 +553,11 @@ public class PostgresSourceITCase extends PostgresTestBase {
}
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
boolean skipSnapshotBackfill,
int fetchSize,
int hookType,
StartupOptions startupOptions)
throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setParallelism(1);
@ -504,6 +584,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
.database(customDatabase.getDatabaseName())
.slotName(slotName)
.tableList(tableId)
.startupOptions(startupOptions)
.skipSnapshotBackfill(skipSnapshotBackfill)
.deserializer(customerTable.getDeserializer())
.build();
@ -528,10 +609,16 @@ public class PostgresSourceITCase extends PostgresTestBase {
}
};
if (hookType == USE_POST_LOWWATERMARK_HOOK) {
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
switch (hookType) {
case USE_POST_LOWWATERMARK_HOOK:
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
break;
case USE_PRE_HIGHWATERMARK_HOOK:
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
break;
case USE_POST_HIGHWATERMARK_HOOK:
hooks.setPostHighWatermarkAction(snapshotPhaseHook);
break;
}
source.setSnapshotHooks(hooks);

Loading…
Cancel
Save