[FLINK-35387][cdc-connector][postgres] PG CDC source support heart beat

This closes #3667
pull/3824/head^2
Hongshun Wang 2 weeks ago committed by GitHub
parent 6b65aed4c5
commit bcd70df85c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.base;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import java.util.Map;
/** CDC event dispatcher which provides ability to dispatch watermark. */
@PublicEvolving
public interface WatermarkDispatcher {
void dispatchWatermarkEvent(
Map<String, ?> sourcePartition,
SourceSplitBase sourceSplit,
Offset watermark,
WatermarkKind watermarkKind)
throws InterruptedException;
}

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.base.relational;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.relational.handler.SchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
@ -61,7 +62,8 @@ import java.util.Map;
* this is useful for downstream to deserialize the {@link HistoryRecord} back.
* </pre>
*/
public class JdbcSourceEventDispatcher<P extends Partition> extends EventDispatcher<P, TableId> {
public class JdbcSourceEventDispatcher<P extends Partition> extends EventDispatcher<P, TableId>
implements WatermarkDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceEventDispatcher.class);
public static final String HISTORY_RECORD_FIELD = "historyRecord";
@ -238,6 +240,7 @@ public class JdbcSourceEventDispatcher<P extends Partition> extends EventDispatc
}
}
@Override
public void dispatchWatermarkEvent(
Map<String, ?> sourcePartition,
SourceSplitBase sourceSplit,

@ -135,7 +135,7 @@ public abstract class AbstractScanFetchTask implements FetchTask {
Context context, SourceSplitBase split, Offset lowWatermark) throws Exception {
if (context instanceof JdbcSourceFetchTaskContext) {
((JdbcSourceFetchTaskContext) context)
.getDispatcher()
.getWaterMarkDispatcher()
.dispatchWatermarkEvent(
((JdbcSourceFetchTaskContext) context)
.getPartition()
@ -157,7 +157,7 @@ public abstract class AbstractScanFetchTask implements FetchTask {
Context context, SourceSplitBase split, Offset highWatermark) throws Exception {
if (context instanceof JdbcSourceFetchTaskContext) {
((JdbcSourceFetchTaskContext) context)
.getDispatcher()
.getWaterMarkDispatcher()
.dispatchWatermarkEvent(
((JdbcSourceFetchTaskContext) context)
.getPartition()
@ -180,7 +180,7 @@ public abstract class AbstractScanFetchTask implements FetchTask {
Context context, SourceSplitBase split, Offset endWatermark) throws Exception {
if (context instanceof JdbcSourceFetchTaskContext) {
((JdbcSourceFetchTaskContext) context)
.getDispatcher()
.getWaterMarkDispatcher()
.dispatchWatermarkEvent(
((JdbcSourceFetchTaskContext) context)
.getPartition()

@ -17,16 +17,18 @@
package org.apache.flink.cdc.connectors.base.source.reader.external;
import org.apache.flink.annotation.Internal;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.table.types.logical.RowType;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseSchema;
@ -43,6 +45,7 @@ import java.util.Map;
import java.util.stream.Collectors;
/** The context for fetch task that fetching data of snapshot split from JDBC data source. */
@Internal
public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
protected final JdbcSourceConfig sourceConfig;
@ -171,7 +174,9 @@ public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
public abstract ErrorHandler getErrorHandler();
public abstract JdbcSourceEventDispatcher getDispatcher();
public abstract EventDispatcher getEventDispatcher();
public abstract WatermarkDispatcher getWaterMarkDispatcher();
public abstract OffsetContext getOffsetContext();

@ -17,7 +17,6 @@
package org.apache.flink.cdc.connectors.db2.source.fetch;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
@ -74,7 +73,7 @@ public class Db2ScanFetchTask extends AbstractScanFetchTask {
sourceFetchContext.getSnapshotChangeEventSourceMetrics(),
sourceFetchContext.getDatabaseSchema(),
sourceFetchContext.getConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getEventDispatcher(),
sourceFetchContext.getSnapshotReceiver(),
snapshotSplit);
Db2SnapshotSplitChangeEventSourceContext changeEventSourceContext =
@ -132,7 +131,8 @@ public class Db2ScanFetchTask extends AbstractScanFetchTask {
new Db2ConnectorConfig(dezConf),
context.getConnection(),
context.getMetaDataConnection(),
context.getDispatcher(),
context.getEventDispatcher(),
context.getWaterMarkDispatcher(),
context.getErrorHandler(),
context.getDatabaseSchema(),
backfillBinlogSplit);
@ -150,7 +150,7 @@ public class Db2ScanFetchTask extends AbstractScanFetchTask {
private final Db2ConnectorConfig connectorConfig;
private final Db2DatabaseSchema databaseSchema;
private final Db2Connection jdbcConnection;
private final JdbcSourceEventDispatcher<Db2Partition> dispatcher;
private final EventDispatcher<Db2Partition, TableId> dispatcher;
private final Clock clock;
private final SnapshotSplit snapshotSplit;
private final Db2OffsetContext offsetContext;
@ -163,7 +163,7 @@ public class Db2ScanFetchTask extends AbstractScanFetchTask {
SnapshotProgressListener<Db2Partition> snapshotProgressListener,
Db2DatabaseSchema databaseSchema,
Db2Connection jdbcConnection,
JdbcSourceEventDispatcher<Db2Partition> dispatcher,
EventDispatcher<Db2Partition, TableId> dispatcher,
EventDispatcher.SnapshotReceiver<Db2Partition> snapshotReceiver,
SnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotProgressListener);

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.db2.source.fetch;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
@ -216,7 +217,12 @@ public class Db2SourceFetchTaskContext extends JdbcSourceFetchTaskContext {
}
@Override
public JdbcSourceEventDispatcher<Db2Partition> getDispatcher() {
public JdbcSourceEventDispatcher getEventDispatcher() {
return dispatcher;
}
@Override
public WatermarkDispatcher getWaterMarkDispatcher() {
return dispatcher;
}

@ -17,7 +17,7 @@
package org.apache.flink.cdc.connectors.db2.source.fetch;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
@ -34,7 +34,9 @@ import io.debezium.connector.db2.Db2Partition;
import io.debezium.connector.db2.Db2StreamingChangeEventSource;
import io.debezium.connector.db2.Lsn;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,7 +63,8 @@ public class Db2StreamFetchTask implements FetchTask<SourceSplitBase> {
sourceFetchContext.getDbzConnectorConfig(),
sourceFetchContext.getConnection(),
sourceFetchContext.getMetaDataConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getEventDispatcher(),
sourceFetchContext.getWaterMarkDispatcher(),
sourceFetchContext.getErrorHandler(),
sourceFetchContext.getDatabaseSchema(),
split);
@ -96,7 +99,7 @@ public class Db2StreamFetchTask implements FetchTask<SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
private final StreamSplit lsnSplit;
private final JdbcSourceEventDispatcher<Db2Partition> dispatcher;
private final WatermarkDispatcher watermarkDispatcher;
private final ErrorHandler errorHandler;
private ChangeEventSourceContext context;
@ -104,7 +107,8 @@ public class Db2StreamFetchTask implements FetchTask<SourceSplitBase> {
Db2ConnectorConfig connectorConfig,
Db2Connection connection,
Db2Connection metadataConnection,
JdbcSourceEventDispatcher<Db2Partition> dispatcher,
EventDispatcher<Db2Partition, TableId> eventDispatcher,
WatermarkDispatcher watermarkDispatcher,
ErrorHandler errorHandler,
Db2DatabaseSchema schema,
StreamSplit lsnSplit) {
@ -112,12 +116,12 @@ public class Db2StreamFetchTask implements FetchTask<SourceSplitBase> {
connectorConfig,
connection,
metadataConnection,
dispatcher,
eventDispatcher,
errorHandler,
Clock.system(),
schema);
this.lsnSplit = lsnSplit;
this.dispatcher = dispatcher;
this.watermarkDispatcher = watermarkDispatcher;
this.errorHandler = errorHandler;
}
@ -130,7 +134,7 @@ public class Db2StreamFetchTask implements FetchTask<SourceSplitBase> {
if (currentLsnOffset.isAtOrAfter(endingOffset)) {
// send streaming end event
try {
dispatcher.dispatchWatermarkEvent(
watermarkDispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
lsnSplit,
currentLsnOffset,

@ -18,7 +18,7 @@
package org.apache.flink.cdc.connectors.oracle.source.reader.fetch;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
@ -36,7 +36,9 @@ import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfini
import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,7 +58,8 @@ public class EventProcessorFactory {
ChangeEventSource.ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
EventDispatcher<OraclePartition, TableId> eventDispatcher,
WatermarkDispatcher watermarkDispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
@ -70,7 +73,8 @@ public class EventProcessorFactory {
context,
connectorConfig,
jdbcConnection,
dispatcher,
eventDispatcher,
watermarkDispatcher,
partition,
offsetContext,
schema,
@ -83,7 +87,8 @@ public class EventProcessorFactory {
context,
connectorConfig,
jdbcConnection,
dispatcher,
eventDispatcher,
watermarkDispatcher,
partition,
offsetContext,
schema,
@ -95,7 +100,8 @@ public class EventProcessorFactory {
context,
connectorConfig,
jdbcConnection,
dispatcher,
eventDispatcher,
watermarkDispatcher,
partition,
offsetContext,
schema,
@ -117,13 +123,14 @@ public class EventProcessorFactory {
private final ErrorHandler errorHandler;
private ChangeEventSource.ChangeEventSourceContext context;
private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
private final WatermarkDispatcher watermarkDispatcher;
public CDCMemoryLogMinerEventProcessor(
ChangeEventSource.ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
EventDispatcher<OraclePartition, TableId> eventDispatcher,
WatermarkDispatcher watermarkDispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
@ -134,7 +141,7 @@ public class EventProcessorFactory {
context,
connectorConfig,
jdbcConnection,
dispatcher,
eventDispatcher,
partition,
offsetContext,
schema,
@ -142,14 +149,14 @@ public class EventProcessorFactory {
this.redoLogSplit = redoLogSplit;
this.errorHandler = errorHandler;
this.context = context;
this.dispatcher = dispatcher;
this.watermarkDispatcher = watermarkDispatcher;
}
@Override
protected void processRow(OraclePartition partition, LogMinerEventRow row)
throws SQLException, InterruptedException {
if (reachEndingOffset(
partition, row, redoLogSplit, errorHandler, dispatcher, context)) {
partition, row, redoLogSplit, errorHandler, watermarkDispatcher, context)) {
return;
}
super.processRow(partition, row);
@ -166,13 +173,14 @@ public class EventProcessorFactory {
private final ErrorHandler errorHandler;
private ChangeEventSource.ChangeEventSourceContext context;
private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
private final WatermarkDispatcher watermarkDispatcher;
public CDCEmbeddedInfinispanLogMinerEventProcessor(
ChangeEventSource.ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
EventDispatcher<OraclePartition, TableId> eventDispatcher,
WatermarkDispatcher watermarkDispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
@ -183,7 +191,7 @@ public class EventProcessorFactory {
context,
connectorConfig,
jdbcConnection,
dispatcher,
eventDispatcher,
partition,
offsetContext,
schema,
@ -191,14 +199,14 @@ public class EventProcessorFactory {
this.redoLogSplit = redoLogSplit;
this.errorHandler = errorHandler;
this.context = context;
this.dispatcher = dispatcher;
this.watermarkDispatcher = watermarkDispatcher;
}
@Override
protected void processRow(OraclePartition partition, LogMinerEventRow row)
throws SQLException, InterruptedException {
if (reachEndingOffset(
partition, row, redoLogSplit, errorHandler, dispatcher, context)) {
partition, row, redoLogSplit, errorHandler, watermarkDispatcher, context)) {
return;
}
super.processRow(partition, row);
@ -215,13 +223,14 @@ public class EventProcessorFactory {
private final ErrorHandler errorHandler;
private ChangeEventSource.ChangeEventSourceContext context;
private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
private final WatermarkDispatcher watermarkDispatcher;
public CDCRemoteInfinispanLogMinerEventProcessor(
ChangeEventSource.ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
EventDispatcher<OraclePartition, TableId> eventDispatcher,
WatermarkDispatcher watermarkDispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
@ -232,7 +241,7 @@ public class EventProcessorFactory {
context,
connectorConfig,
jdbcConnection,
dispatcher,
eventDispatcher,
partition,
offsetContext,
schema,
@ -240,14 +249,14 @@ public class EventProcessorFactory {
this.redoLogSplit = redoLogSplit;
this.errorHandler = errorHandler;
this.context = context;
this.dispatcher = dispatcher;
this.watermarkDispatcher = watermarkDispatcher;
}
@Override
protected void processRow(OraclePartition partition, LogMinerEventRow row)
throws SQLException, InterruptedException {
if (reachEndingOffset(
partition, row, redoLogSplit, errorHandler, dispatcher, context)) {
partition, row, redoLogSplit, errorHandler, watermarkDispatcher, context)) {
return;
}
super.processRow(partition, row);
@ -259,7 +268,7 @@ public class EventProcessorFactory {
LogMinerEventRow row,
StreamSplit redoLogSplit,
ErrorHandler errorHandler,
JdbcSourceEventDispatcher dispatcher,
WatermarkDispatcher dispatcher,
ChangeEventSource.ChangeEventSourceContext context) {
// check do we need to stop for fetch redo log for snapshot split.
if (isBoundedRead(redoLogSplit)) {

@ -17,7 +17,6 @@
package org.apache.flink.cdc.connectors.oracle.source.reader.fetch;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
@ -74,7 +73,7 @@ public class OracleScanFetchTask extends AbstractScanFetchTask {
sourceFetchContext.getSnapshotChangeEventSourceMetrics(),
sourceFetchContext.getDatabaseSchema(),
sourceFetchContext.getConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getEventDispatcher(),
snapshotSplit);
StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();
@ -136,7 +135,8 @@ public class OracleScanFetchTask extends AbstractScanFetchTask {
return new RedoLogSplitReadTask(
new OracleConnectorConfig(dezConf),
context.getConnection(),
context.getDispatcher(),
context.getEventDispatcher(),
context.getWaterMarkDispatcher(),
context.getErrorHandler(),
context.getDatabaseSchema(),
context.getSourceConfig().getOriginDbzConnectorConfig(),
@ -157,7 +157,7 @@ public class OracleScanFetchTask extends AbstractScanFetchTask {
private final OracleConnectorConfig connectorConfig;
private final OracleDatabaseSchema databaseSchema;
private final OracleConnection jdbcConnection;
private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
private final EventDispatcher<OraclePartition, TableId> eventDispatcher;
private final Clock clock;
private final SnapshotSplit snapshotSplit;
private final OracleOffsetContext offsetContext;
@ -169,14 +169,14 @@ public class OracleScanFetchTask extends AbstractScanFetchTask {
SnapshotProgressListener<OraclePartition> snapshotProgressListener,
OracleDatabaseSchema databaseSchema,
OracleConnection jdbcConnection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
EventDispatcher<OraclePartition, TableId> eventDispatcher,
SnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotProgressListener);
this.offsetContext = previousOffset;
this.connectorConfig = connectorConfig;
this.databaseSchema = databaseSchema;
this.jdbcConnection = jdbcConnection;
this.dispatcher = dispatcher;
this.eventDispatcher = eventDispatcher;
this.clock = Clock.SYSTEM;
this.snapshotSplit = snapshotSplit;
this.snapshotProgressListener = snapshotProgressListener;
@ -243,7 +243,7 @@ public class OracleScanFetchTask extends AbstractScanFetchTask {
private void createDataEvents(OracleSnapshotContext snapshotContext, TableId tableId)
throws Exception {
EventDispatcher.SnapshotReceiver<OraclePartition> snapshotReceiver =
dispatcher.getSnapshotChangeEventReceiver();
eventDispatcher.getSnapshotChangeEventReceiver();
LOG.debug("Snapshotting table {}", tableId);
createDataEventsForTable(
snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
@ -306,7 +306,7 @@ public class OracleScanFetchTask extends AbstractScanFetchTask {
snapshotContext.partition, table.id(), rows);
logTimer = getTableScanLogTimer();
}
dispatcher.dispatchSnapshotEvent(
eventDispatcher.dispatchSnapshotEvent(
snapshotContext.partition,
table.id(),
getChangeRecordEmitter(snapshotContext, table.id(), row),

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.oracle.source.reader.fetch;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
@ -230,7 +231,12 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
}
@Override
public JdbcSourceEventDispatcher<OraclePartition> getDispatcher() {
public JdbcSourceEventDispatcher<OraclePartition> getEventDispatcher() {
return dispatcher;
}
@Override
public WatermarkDispatcher getWaterMarkDispatcher() {
return dispatcher;
}

@ -18,7 +18,7 @@
package org.apache.flink.cdc.connectors.oracle.source.reader.fetch;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
@ -33,6 +33,8 @@ import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,7 +58,8 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
new RedoLogSplitReadTask(
sourceFetchContext.getDbzConnectorConfig(),
sourceFetchContext.getConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getEventDispatcher(),
sourceFetchContext.getWaterMarkDispatcher(),
sourceFetchContext.getErrorHandler(),
sourceFetchContext.getDatabaseSchema(),
sourceFetchContext.getSourceConfig().getOriginDbzConnectorConfig(),
@ -93,9 +96,9 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(RedoLogSplitReadTask.class);
private final StreamSplit redoLogSplit;
private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
EventDispatcher<OraclePartition, TableId> eventDispatcher;
private final WatermarkDispatcher watermarkDispatcher;
private final ErrorHandler errorHandler;
private ChangeEventSourceContext context;
private final OracleConnectorConfig connectorConfig;
private final OracleConnection connection;
@ -106,7 +109,8 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
public RedoLogSplitReadTask(
OracleConnectorConfig connectorConfig,
OracleConnection connection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
EventDispatcher<OraclePartition, TableId> eventDispatcher,
WatermarkDispatcher watermarkDispatcher,
ErrorHandler errorHandler,
OracleDatabaseSchema schema,
Configuration jdbcConfig,
@ -115,14 +119,15 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
super(
connectorConfig,
connection,
dispatcher,
eventDispatcher,
errorHandler,
Clock.SYSTEM,
schema,
jdbcConfig,
metrics);
this.redoLogSplit = redoLogSplit;
this.dispatcher = dispatcher;
this.eventDispatcher = eventDispatcher;
this.watermarkDispatcher = watermarkDispatcher;
this.errorHandler = errorHandler;
this.connectorConfig = connectorConfig;
this.connection = connection;
@ -135,7 +140,6 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
ChangeEventSourceContext context,
OraclePartition partition,
OracleOffsetContext offsetContext) {
this.context = context;
super.execute(context, partition, offsetContext);
}
@ -152,7 +156,8 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
context,
connectorConfig,
connection,
dispatcher,
eventDispatcher,
watermarkDispatcher,
partition,
offsetContext,
schema,

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.postgres.source.fetch;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresEventDispatcher;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Map;
/** Postgres Dispatcher for cdc source with watermark. */
public class CDCPostgresDispatcher extends PostgresEventDispatcher<TableId>
implements WatermarkDispatcher {
private final String topic;
private final ChangeEventQueue<DataChangeEvent> queue;
public CDCPostgresDispatcher(
PostgresConnectorConfig connectorConfig,
TopicSelector topicSelector,
DatabaseSchema schema,
ChangeEventQueue queue,
DataCollectionFilters.DataCollectionFilter filter,
ChangeEventCreator changeEventCreator,
EventMetadataProvider metadataProvider,
HeartbeatFactory heartbeatFactory,
SchemaNameAdjuster schemaNameAdjuster) {
super(
connectorConfig,
topicSelector,
schema,
queue,
filter,
changeEventCreator,
metadataProvider,
heartbeatFactory,
schemaNameAdjuster);
this.topic = topicSelector.getPrimaryTopic();
this.queue = queue;
}
@Override
public void dispatchWatermarkEvent(
Map<String, ?> sourcePartition,
SourceSplitBase sourceSplit,
Offset watermark,
WatermarkKind watermarkKind)
throws InterruptedException {
SourceRecord sourceRecord =
WatermarkEvent.create(
sourcePartition, topic, sourceSplit.splitId(), watermarkKind, watermark);
queue.enqueue(new DataChangeEvent(sourceRecord));
}
}

@ -17,7 +17,6 @@
package org.apache.flink.cdc.connectors.postgres.source.fetch;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
@ -28,6 +27,7 @@ import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresEventDispatcher;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
@ -102,7 +102,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
ctx.getDbzConnectorConfig(),
ctx.getDatabaseSchema(),
ctx.getOffsetContext(),
ctx.getDispatcher(),
ctx.getEventDispatcher(),
ctx.getSnapshotChangeEventSourceMetrics(),
snapshotSplit);
@ -135,8 +135,8 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
ctx.getDbzConnectorConfig(),
ctx.getSnapShotter(),
ctx.getConnection(),
ctx.getDispatcher(),
ctx.getPostgresDispatcher(),
ctx.getEventDispatcher(),
ctx.getWaterMarkDispatcher(),
ctx.getErrorHandler(),
ctx.getTaskContext().getClock(),
ctx.getDatabaseSchema(),
@ -215,7 +215,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
private final PostgresConnection jdbcConnection;
private final PostgresConnectorConfig connectorConfig;
private final JdbcSourceEventDispatcher<PostgresPartition> dispatcher;
private final PostgresEventDispatcher<TableId> eventDispatcher;
private final SnapshotSplit snapshotSplit;
private final PostgresOffsetContext offsetContext;
private final PostgresSchema databaseSchema;
@ -227,7 +227,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
PostgresConnectorConfig connectorConfig,
PostgresSchema databaseSchema,
PostgresOffsetContext previousOffset,
JdbcSourceEventDispatcher dispatcher,
PostgresEventDispatcher<TableId> eventDispatcher,
SnapshotProgressListener snapshotProgressListener,
SnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotProgressListener);
@ -235,7 +235,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
this.connectorConfig = connectorConfig;
this.snapshotProgressListener = snapshotProgressListener;
this.databaseSchema = databaseSchema;
this.dispatcher = dispatcher;
this.eventDispatcher = eventDispatcher;
this.snapshotSplit = snapshotSplit;
this.offsetContext = previousOffset;
this.clock = Clock.SYSTEM;
@ -260,7 +260,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
private void createDataEvents(PostgresSnapshotContext snapshotContext, TableId tableId)
throws InterruptedException {
EventDispatcher.SnapshotReceiver<PostgresPartition> snapshotReceiver =
dispatcher.getSnapshotChangeEventReceiver();
eventDispatcher.getSnapshotChangeEventReceiver();
LOG.info("Snapshotting table {}", tableId);
createDataEventsForTable(
snapshotContext,
@ -337,7 +337,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask {
SnapshotChangeRecordEmitter<PostgresPartition> emitter =
new SnapshotChangeRecordEmitter<>(
snapshotContext.partition, snapshotContext.offset, row, clock);
dispatcher.dispatchSnapshotEvent(
eventDispatcher.dispatchSnapshotEvent(
snapshotContext.partition, table.id(), emitter, snapshotReceiver);
}
LOG.info(

@ -17,8 +17,8 @@
package org.apache.flink.cdc.connectors.postgres.source.fetch;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
@ -26,13 +26,13 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.handler.PostgresSchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
import org.apache.flink.table.types.logical.RowType;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresErrorHandler;
@ -48,6 +48,7 @@ import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
@ -60,6 +61,7 @@ import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.TopicSelector;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -90,8 +92,7 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private PostgresPartition partition;
private PostgresSchema schema;
private ErrorHandler errorHandler;
private JdbcSourceEventDispatcher<PostgresPartition> dispatcher;
private PostgresEventDispatcher<TableId> postgresDispatcher;
private CDCPostgresDispatcher postgresDispatcher;
private EventMetadataProvider metadataProvider;
private SnapshotChangeEventSourceMetrics<PostgresPartition> snapshotChangeEventSourceMetrics;
private Snapshotter snapShotter;
@ -215,27 +216,48 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
this.errorHandler = new PostgresErrorHandler(getDbzConnectorConfig(), queue);
this.metadataProvider = PostgresObjectUtils.newEventMetadataProvider();
this.dispatcher =
new JdbcSourceEventDispatcher<>(
dbzConfig,
topicSelector,
schema,
queue,
dbzConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
schemaNameAdjuster,
new PostgresSchemaChangeEventHandler());
PostgresConnectorConfig finalDbzConfig = dbzConfig;
this.postgresDispatcher =
new PostgresEventDispatcher<>(
dbzConfig,
new CDCPostgresDispatcher(
finalDbzConfig,
topicSelector,
schema,
queue,
dbzConfig.getTableFilters().dataCollectionFilter(),
finalDbzConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
new HeartbeatFactory<>(
dbzConfig,
topicSelector,
schemaNameAdjuster,
() ->
new PostgresConnection(
finalDbzConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_GENERAL),
exception -> {
String sqlErrorId = exception.getSQLState();
switch (sqlErrorId) {
case "57P01":
// Postgres error admin_shutdown, see
// https://www.postgresql.org/docs/12/errcodes-appendix.html
throw new DebeziumException(
"Could not execute heartbeat action query (Error: "
+ sqlErrorId
+ ")",
exception);
case "57P03":
// Postgres error cannot_connect_now, see
// https://www.postgresql.org/docs/12/errcodes-appendix.html
throw new RetriableException(
"Could not execute heartbeat action query (Error: "
+ sqlErrorId
+ ")",
exception);
default:
break;
}
}),
schemaNameAdjuster);
ChangeEventSourceMetricsFactory<PostgresPartition> metricsFactory =
@ -261,11 +283,12 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
}
@Override
public JdbcSourceEventDispatcher<PostgresPartition> getDispatcher() {
return dispatcher;
public PostgresEventDispatcher<TableId> getEventDispatcher() {
return postgresDispatcher;
}
public PostgresEventDispatcher<TableId> getPostgresDispatcher() {
@Override
public WatermarkDispatcher getWaterMarkDispatcher() {
return postgresDispatcher;
}

@ -17,7 +17,7 @@
package org.apache.flink.cdc.connectors.postgres.source.fetch;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
@ -84,8 +84,8 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
sourceFetchContext.getDbzConnectorConfig(),
sourceFetchContext.getSnapShotter(),
sourceFetchContext.getConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getPostgresDispatcher(),
sourceFetchContext.getEventDispatcher(),
sourceFetchContext.getWaterMarkDispatcher(),
sourceFetchContext.getErrorHandler(),
sourceFetchContext.getTaskContext().getClock(),
sourceFetchContext.getDatabaseSchema(),
@ -166,7 +166,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource {
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
private final StreamSplit streamSplit;
private final JdbcSourceEventDispatcher<PostgresPartition> dispatcher;
private final WatermarkDispatcher watermarkDispatcher;
private final ErrorHandler errorHandler;
public ChangeEventSourceContext context;
@ -176,8 +176,8 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
PostgresConnectorConfig connectorConfig,
Snapshotter snapshotter,
PostgresConnection connection,
JdbcSourceEventDispatcher<PostgresPartition> dispatcher,
PostgresEventDispatcher<TableId> postgresEventDispatcher,
PostgresEventDispatcher<TableId> eventDispatcher,
WatermarkDispatcher watermarkDispatcher,
ErrorHandler errorHandler,
Clock clock,
PostgresSchema schema,
@ -189,14 +189,14 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
connectorConfig,
snapshotter,
connection,
postgresEventDispatcher,
eventDispatcher,
errorHandler,
clock,
schema,
taskContext,
replicationConnection);
this.streamSplit = streamSplit;
this.dispatcher = dispatcher;
this.watermarkDispatcher = watermarkDispatcher;
this.errorHandler = errorHandler;
}
@ -218,7 +218,7 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
LOG.debug("StreamSplit is bounded read: {}", streamSplit);
final PostgresOffset currentOffset = PostgresOffset.of(offsetContext.getOffset());
try {
dispatcher.dispatchWatermarkEvent(
watermarkDispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
streamSplit,
currentOffset,

@ -44,6 +44,7 @@ import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
@ -74,6 +75,7 @@ import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** IT tests for {@link PostgresSourceBuilder.PostgresIncrementalSource}. */
@ -253,9 +255,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
PostgresTestUtils.FailoverType.NONE,
PostgresTestUtils.FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
RestartStrategies.noRestart(),
false,
null);
RestartStrategies.noRestart());
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
@ -272,9 +272,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
PostgresTestUtils.FailoverType.NONE,
PostgresTestUtils.FailoverPhase.NEVER,
new String[] {"customers_no_pk"},
RestartStrategies.noRestart(),
false,
null);
RestartStrategies.noRestart());
}
}
@ -287,8 +285,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
PostgresTestUtils.FailoverPhase.SNAPSHOT,
new String[] {"customers"},
RestartStrategies.fixedDelayRestart(1, 0),
true,
null);
Collections.singletonMap("scan.incremental.snapshot.backfill.skip", "true"));
}
@Test
@ -660,8 +657,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
PostgresTestUtils.FailoverPhase.NEVER,
new String[] {"customers"},
RestartStrategies.noRestart(),
false,
chunkColumn);
Collections.singletonMap(
"scan.incremental.snapshot.chunk.key-column", chunkColumn));
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
@ -673,6 +670,34 @@ public class PostgresSourceITCase extends PostgresTestBase {
}
}
@Test
public void testHeartBeat() throws Exception {
try (PostgresConnection connection = getConnection()) {
connection.execute("CREATE TABLE IF NOT EXISTS heart_beat_table(a int)");
connection.commit();
}
TableId tableId = new TableId(null, "public", "heart_beat_table");
try (PostgresConnection connection = getConnection()) {
assertEquals(0, getCountOfTable(connection, tableId));
}
Map<String, String> options = new HashMap<>();
options.put("heartbeat.interval.ms", "100");
options.put("debezium.heartbeat.action.query", "INSERT INTO heart_beat_table VALUES(1)");
testPostgresParallelSource(
1,
scanStartupMode,
PostgresTestUtils.FailoverType.NONE,
PostgresTestUtils.FailoverPhase.NEVER,
new String[] {"customers"},
RestartStrategies.noRestart(),
options);
try (PostgresConnection connection = getConnection()) {
assertTrue(getCountOfTable(connection, tableId) > 0);
}
}
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill,
int fetchSize,
@ -776,8 +801,25 @@ public class PostgresSourceITCase extends PostgresTestBase {
failoverPhase,
captureCustomerTables,
RestartStrategies.fixedDelayRestart(1, 0),
false,
null);
new HashMap<>());
}
private void testPostgresParallelSource(
int parallelism,
String scanStartupMode,
PostgresTestUtils.FailoverType failoverType,
PostgresTestUtils.FailoverPhase failoverPhase,
String[] captureCustomerTables,
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)
throws Exception {
testPostgresParallelSource(
parallelism,
scanStartupMode,
failoverType,
failoverPhase,
captureCustomerTables,
restartStrategyConfiguration,
new HashMap<>());
}
private void testPostgresParallelSource(
@ -787,8 +829,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
PostgresTestUtils.FailoverPhase failoverPhase,
String[] captureCustomerTables,
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
boolean skipSnapshotBackfill,
String chunkColumn)
Map<String, String> otherOptions)
throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@ -817,9 +858,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s',"
+ " 'scan.lsn-commit.checkpoints-num-delay' = '1'"
+ ""
+ "%s"
+ ")",
customDatabase.getHost(),
customDatabase.getDatabasePort(),
@ -830,12 +870,16 @@ public class PostgresSourceITCase extends PostgresTestBase {
getTableNameRegex(captureCustomerTables),
scanStartupMode,
slotName,
skipSnapshotBackfill,
chunkColumn == null
otherOptions.isEmpty()
? ""
: ",'scan.incremental.snapshot.chunk.key-column'='"
+ chunkColumn
+ "'");
: ","
+ otherOptions.entrySet().stream()
.map(
e ->
String.format(
"'%s'='%s'",
e.getKey(), e.getValue()))
.collect(Collectors.joining(",")));
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
@ -1163,4 +1207,23 @@ public class PostgresSourceITCase extends PostgresTestBase {
properties.put("dbname", customDatabase.getDatabaseName());
return createConnection(properties);
}
private static long getCountOfTable(JdbcConnection jdbc, TableId tableId) throws SQLException {
// The statement used to get approximate row count which is less
// accurate than COUNT(*), but is more efficient for large table.
// https://stackoverflow.com/questions/7943233/fast-way-to-discover-the-row-count-of-a-table-in-postgresql
// NOTE: it requires ANALYZE or VACUUM to be run first in PostgreSQL.
final String query = String.format("SELECT COUNT(1) FROM %s", tableId.toString());
return jdbc.queryAndMap(
query,
rs -> {
if (!rs.next()) {
throw new SQLException(
String.format(
"No result returned after running query [%s]", query));
}
return rs.getLong(1);
});
}
}

@ -17,7 +17,6 @@
package org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
@ -75,7 +74,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask {
sourceFetchContext.getSnapshotChangeEventSourceMetrics(),
sourceFetchContext.getDatabaseSchema(),
sourceFetchContext.getConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getEventDispatcher(),
sourceFetchContext.getSnapshotReceiver(),
snapshotSplit);
SqlServerSnapshotSplitChangeEventSourceContext changeEventSourceContext =
@ -134,7 +133,8 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask {
new SqlServerConnectorConfig(dezConf),
context.getConnection(),
context.getMetaDataConnection(),
context.getDispatcher(),
context.getEventDispatcher(),
context.getWaterMarkDispatcher(),
context.getErrorHandler(),
context.getDatabaseSchema(),
backfillBinlogSplit);
@ -153,7 +153,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask {
private final SqlServerConnectorConfig connectorConfig;
private final SqlServerDatabaseSchema databaseSchema;
private final SqlServerConnection jdbcConnection;
private final JdbcSourceEventDispatcher<SqlServerPartition> dispatcher;
private final EventDispatcher<SqlServerPartition, TableId> eventDispatcher;
private final Clock clock;
private final SnapshotSplit snapshotSplit;
private final SqlServerOffsetContext offsetContext;
@ -166,7 +166,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask {
SnapshotProgressListener<SqlServerPartition> snapshotProgressListener,
SqlServerDatabaseSchema databaseSchema,
SqlServerConnection jdbcConnection,
JdbcSourceEventDispatcher<SqlServerPartition> dispatcher,
EventDispatcher<SqlServerPartition, TableId> eventDispatcher,
EventDispatcher.SnapshotReceiver<SqlServerPartition> snapshotReceiver,
SnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotProgressListener);
@ -174,7 +174,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask {
this.connectorConfig = connectorConfig;
this.databaseSchema = databaseSchema;
this.jdbcConnection = jdbcConnection;
this.dispatcher = dispatcher;
this.eventDispatcher = eventDispatcher;
this.clock = Clock.SYSTEM;
this.snapshotSplit = snapshotSplit;
this.snapshotProgressListener = snapshotProgressListener;
@ -296,7 +296,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask {
snapshotContext.partition, table.id(), rows);
logTimer = getTableScanLogTimer();
}
dispatcher.dispatchSnapshotEvent(
eventDispatcher.dispatchSnapshotEvent(
snapshotContext.partition,
table.id(),
getChangeRecordEmitter(snapshotContext, table.id(), row),

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
@ -229,7 +230,12 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
}
@Override
public JdbcSourceEventDispatcher<SqlServerPartition> getDispatcher() {
public JdbcSourceEventDispatcher<SqlServerPartition> getEventDispatcher() {
return dispatcher;
}
@Override
public WatermarkDispatcher getWaterMarkDispatcher() {
return dispatcher;
}

@ -17,7 +17,7 @@
package org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
@ -35,7 +35,9 @@ import io.debezium.connector.sqlserver.SqlServerOffsetContext;
import io.debezium.connector.sqlserver.SqlServerPartition;
import io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,7 +65,8 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
sourceFetchContext.getDbzConnectorConfig(),
sourceFetchContext.getConnection(),
sourceFetchContext.getMetaDataConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getEventDispatcher(),
sourceFetchContext.getWaterMarkDispatcher(),
sourceFetchContext.getErrorHandler(),
sourceFetchContext.getDatabaseSchema(),
split);
@ -98,7 +101,7 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
private final StreamSplit lsnSplit;
private final JdbcSourceEventDispatcher<SqlServerPartition> dispatcher;
private final WatermarkDispatcher watermarkDispatcher;
private final ErrorHandler errorHandler;
private ChangeEventSourceContext context;
@ -106,7 +109,8 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
SqlServerConnectorConfig connectorConfig,
SqlServerConnection connection,
SqlServerConnection metadataConnection,
JdbcSourceEventDispatcher<SqlServerPartition> dispatcher,
EventDispatcher<SqlServerPartition, TableId> eventDispatcher,
WatermarkDispatcher watermarkDispatcher,
ErrorHandler errorHandler,
SqlServerDatabaseSchema schema,
StreamSplit lsnSplit) {
@ -114,12 +118,12 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
connectorConfig,
connection,
metadataConnection,
dispatcher,
eventDispatcher,
errorHandler,
Clock.system(),
schema);
this.lsnSplit = lsnSplit;
this.dispatcher = dispatcher;
this.watermarkDispatcher = watermarkDispatcher;
this.errorHandler = errorHandler;
}
@ -132,7 +136,7 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
if (currentLsnOffset.isAtOrAfter(endingOffset)) {
// send streaming end event
try {
dispatcher.dispatchWatermarkEvent(
watermarkDispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
lsnSplit,
currentLsnOffset,

Loading…
Cancel
Save