diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/WatermarkDispatcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/WatermarkDispatcher.java new file mode 100644 index 000000000..083cc7e40 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/WatermarkDispatcher.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; +import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind; + +import java.util.Map; + +/** CDC event dispatcher which provides ability to dispatch watermark. */ +@PublicEvolving +public interface WatermarkDispatcher { + + void dispatchWatermarkEvent( + Map sourcePartition, + SourceSplitBase sourceSplit, + Offset watermark, + WatermarkKind watermarkKind) + throws InterruptedException; +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java index deb6794ac..117a52100 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.base.relational; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.relational.handler.SchemaChangeEventHandler; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; @@ -61,7 +62,8 @@ import java.util.Map; * this is useful for downstream to deserialize the {@link HistoryRecord} back. * */ -public class JdbcSourceEventDispatcher

extends EventDispatcher { +public class JdbcSourceEventDispatcher

extends EventDispatcher + implements WatermarkDispatcher { private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceEventDispatcher.class); public static final String HISTORY_RECORD_FIELD = "historyRecord"; @@ -238,6 +240,7 @@ public class JdbcSourceEventDispatcher

extends EventDispatc } } + @Override public void dispatchWatermarkEvent( Map sourcePartition, SourceSplitBase sourceSplit, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/AbstractScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/AbstractScanFetchTask.java index 7dcf22b9c..973a667ab 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/AbstractScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/AbstractScanFetchTask.java @@ -135,7 +135,7 @@ public abstract class AbstractScanFetchTask implements FetchTask { Context context, SourceSplitBase split, Offset lowWatermark) throws Exception { if (context instanceof JdbcSourceFetchTaskContext) { ((JdbcSourceFetchTaskContext) context) - .getDispatcher() + .getWaterMarkDispatcher() .dispatchWatermarkEvent( ((JdbcSourceFetchTaskContext) context) .getPartition() @@ -157,7 +157,7 @@ public abstract class AbstractScanFetchTask implements FetchTask { Context context, SourceSplitBase split, Offset highWatermark) throws Exception { if (context instanceof JdbcSourceFetchTaskContext) { ((JdbcSourceFetchTaskContext) context) - .getDispatcher() + .getWaterMarkDispatcher() .dispatchWatermarkEvent( ((JdbcSourceFetchTaskContext) context) .getPartition() @@ -180,7 +180,7 @@ public abstract class AbstractScanFetchTask implements FetchTask { Context context, SourceSplitBase split, Offset endWatermark) throws Exception { if (context instanceof JdbcSourceFetchTaskContext) { ((JdbcSourceFetchTaskContext) context) - .getDispatcher() + .getWaterMarkDispatcher() .dispatchWatermarkEvent( ((JdbcSourceFetchTaskContext) context) .getPartition() diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java index 93120fd20..3892229d4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java @@ -17,16 +17,18 @@ package org.apache.flink.cdc.connectors.base.source.reader.external; +import org.apache.flink.annotation.Internal; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.config.SourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils; import org.apache.flink.table.types.logical.RowType; import io.debezium.config.CommonConnectorConfig; import io.debezium.data.Envelope; import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; import io.debezium.relational.RelationalDatabaseSchema; @@ -43,6 +45,7 @@ import java.util.Map; import java.util.stream.Collectors; /** The context for fetch task that fetching data of snapshot split from JDBC data source. */ +@Internal public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context { protected final JdbcSourceConfig sourceConfig; @@ -171,7 +174,9 @@ public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context { public abstract ErrorHandler getErrorHandler(); - public abstract JdbcSourceEventDispatcher getDispatcher(); + public abstract EventDispatcher getEventDispatcher(); + + public abstract WatermarkDispatcher getWaterMarkDispatcher(); public abstract OffsetContext getOffsetContext(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java index dba9fe84a..d6baf843a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.connectors.db2.source.fetch; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask; @@ -74,7 +73,7 @@ public class Db2ScanFetchTask extends AbstractScanFetchTask { sourceFetchContext.getSnapshotChangeEventSourceMetrics(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getConnection(), - sourceFetchContext.getDispatcher(), + sourceFetchContext.getEventDispatcher(), sourceFetchContext.getSnapshotReceiver(), snapshotSplit); Db2SnapshotSplitChangeEventSourceContext changeEventSourceContext = @@ -132,7 +131,8 @@ public class Db2ScanFetchTask extends AbstractScanFetchTask { new Db2ConnectorConfig(dezConf), context.getConnection(), context.getMetaDataConnection(), - context.getDispatcher(), + context.getEventDispatcher(), + context.getWaterMarkDispatcher(), context.getErrorHandler(), context.getDatabaseSchema(), backfillBinlogSplit); @@ -150,7 +150,7 @@ public class Db2ScanFetchTask extends AbstractScanFetchTask { private final Db2ConnectorConfig connectorConfig; private final Db2DatabaseSchema databaseSchema; private final Db2Connection jdbcConnection; - private final JdbcSourceEventDispatcher dispatcher; + private final EventDispatcher dispatcher; private final Clock clock; private final SnapshotSplit snapshotSplit; private final Db2OffsetContext offsetContext; @@ -163,7 +163,7 @@ public class Db2ScanFetchTask extends AbstractScanFetchTask { SnapshotProgressListener snapshotProgressListener, Db2DatabaseSchema databaseSchema, Db2Connection jdbcConnection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher dispatcher, EventDispatcher.SnapshotReceiver snapshotReceiver, SnapshotSplit snapshotSplit) { super(connectorConfig, snapshotProgressListener); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java index 45bf132ea..64e99c32f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.db2.source.fetch; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory; @@ -216,7 +217,12 @@ public class Db2SourceFetchTaskContext extends JdbcSourceFetchTaskContext { } @Override - public JdbcSourceEventDispatcher getDispatcher() { + public JdbcSourceEventDispatcher getEventDispatcher() { + return dispatcher; + } + + @Override + public WatermarkDispatcher getWaterMarkDispatcher() { return dispatcher; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2StreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2StreamFetchTask.java index 36d7fe03e..43581c549 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2StreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2StreamFetchTask.java @@ -17,7 +17,7 @@ package org.apache.flink.cdc.connectors.db2.source.fetch; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; @@ -34,7 +34,9 @@ import io.debezium.connector.db2.Db2Partition; import io.debezium.connector.db2.Db2StreamingChangeEventSource; import io.debezium.connector.db2.Lsn; import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; +import io.debezium.relational.TableId; import io.debezium.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +63,8 @@ public class Db2StreamFetchTask implements FetchTask { sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getConnection(), sourceFetchContext.getMetaDataConnection(), - sourceFetchContext.getDispatcher(), + sourceFetchContext.getEventDispatcher(), + sourceFetchContext.getWaterMarkDispatcher(), sourceFetchContext.getErrorHandler(), sourceFetchContext.getDatabaseSchema(), split); @@ -96,7 +99,7 @@ public class Db2StreamFetchTask implements FetchTask { private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class); private final StreamSplit lsnSplit; - private final JdbcSourceEventDispatcher dispatcher; + private final WatermarkDispatcher watermarkDispatcher; private final ErrorHandler errorHandler; private ChangeEventSourceContext context; @@ -104,7 +107,8 @@ public class Db2StreamFetchTask implements FetchTask { Db2ConnectorConfig connectorConfig, Db2Connection connection, Db2Connection metadataConnection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher eventDispatcher, + WatermarkDispatcher watermarkDispatcher, ErrorHandler errorHandler, Db2DatabaseSchema schema, StreamSplit lsnSplit) { @@ -112,12 +116,12 @@ public class Db2StreamFetchTask implements FetchTask { connectorConfig, connection, metadataConnection, - dispatcher, + eventDispatcher, errorHandler, Clock.system(), schema); this.lsnSplit = lsnSplit; - this.dispatcher = dispatcher; + this.watermarkDispatcher = watermarkDispatcher; this.errorHandler = errorHandler; } @@ -130,7 +134,7 @@ public class Db2StreamFetchTask implements FetchTask { if (currentLsnOffset.isAtOrAfter(endingOffset)) { // send streaming end event try { - dispatcher.dispatchWatermarkEvent( + watermarkDispatcher.dispatchWatermarkEvent( partition.getSourcePartition(), lsnSplit, currentLsnOffset, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory.java index 9676c8b0a..00e8d91d4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory.java @@ -18,7 +18,7 @@ package org.apache.flink.cdc.connectors.oracle.source.reader.fetch; import org.apache.flink.cdc.common.annotation.Internal; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind; import org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; @@ -36,7 +36,9 @@ import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfini import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor; import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource; +import io.debezium.relational.TableId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +58,8 @@ public class EventProcessorFactory { ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher eventDispatcher, + WatermarkDispatcher watermarkDispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, @@ -70,7 +73,8 @@ public class EventProcessorFactory { context, connectorConfig, jdbcConnection, - dispatcher, + eventDispatcher, + watermarkDispatcher, partition, offsetContext, schema, @@ -83,7 +87,8 @@ public class EventProcessorFactory { context, connectorConfig, jdbcConnection, - dispatcher, + eventDispatcher, + watermarkDispatcher, partition, offsetContext, schema, @@ -95,7 +100,8 @@ public class EventProcessorFactory { context, connectorConfig, jdbcConnection, - dispatcher, + eventDispatcher, + watermarkDispatcher, partition, offsetContext, schema, @@ -117,13 +123,14 @@ public class EventProcessorFactory { private final ErrorHandler errorHandler; private ChangeEventSource.ChangeEventSourceContext context; - private final JdbcSourceEventDispatcher dispatcher; + private final WatermarkDispatcher watermarkDispatcher; public CDCMemoryLogMinerEventProcessor( ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher eventDispatcher, + WatermarkDispatcher watermarkDispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, @@ -134,7 +141,7 @@ public class EventProcessorFactory { context, connectorConfig, jdbcConnection, - dispatcher, + eventDispatcher, partition, offsetContext, schema, @@ -142,14 +149,14 @@ public class EventProcessorFactory { this.redoLogSplit = redoLogSplit; this.errorHandler = errorHandler; this.context = context; - this.dispatcher = dispatcher; + this.watermarkDispatcher = watermarkDispatcher; } @Override protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException { if (reachEndingOffset( - partition, row, redoLogSplit, errorHandler, dispatcher, context)) { + partition, row, redoLogSplit, errorHandler, watermarkDispatcher, context)) { return; } super.processRow(partition, row); @@ -166,13 +173,14 @@ public class EventProcessorFactory { private final ErrorHandler errorHandler; private ChangeEventSource.ChangeEventSourceContext context; - private final JdbcSourceEventDispatcher dispatcher; + private final WatermarkDispatcher watermarkDispatcher; public CDCEmbeddedInfinispanLogMinerEventProcessor( ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher eventDispatcher, + WatermarkDispatcher watermarkDispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, @@ -183,7 +191,7 @@ public class EventProcessorFactory { context, connectorConfig, jdbcConnection, - dispatcher, + eventDispatcher, partition, offsetContext, schema, @@ -191,14 +199,14 @@ public class EventProcessorFactory { this.redoLogSplit = redoLogSplit; this.errorHandler = errorHandler; this.context = context; - this.dispatcher = dispatcher; + this.watermarkDispatcher = watermarkDispatcher; } @Override protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException { if (reachEndingOffset( - partition, row, redoLogSplit, errorHandler, dispatcher, context)) { + partition, row, redoLogSplit, errorHandler, watermarkDispatcher, context)) { return; } super.processRow(partition, row); @@ -215,13 +223,14 @@ public class EventProcessorFactory { private final ErrorHandler errorHandler; private ChangeEventSource.ChangeEventSourceContext context; - private final JdbcSourceEventDispatcher dispatcher; + private final WatermarkDispatcher watermarkDispatcher; public CDCRemoteInfinispanLogMinerEventProcessor( ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher eventDispatcher, + WatermarkDispatcher watermarkDispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, @@ -232,7 +241,7 @@ public class EventProcessorFactory { context, connectorConfig, jdbcConnection, - dispatcher, + eventDispatcher, partition, offsetContext, schema, @@ -240,14 +249,14 @@ public class EventProcessorFactory { this.redoLogSplit = redoLogSplit; this.errorHandler = errorHandler; this.context = context; - this.dispatcher = dispatcher; + this.watermarkDispatcher = watermarkDispatcher; } @Override protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException { if (reachEndingOffset( - partition, row, redoLogSplit, errorHandler, dispatcher, context)) { + partition, row, redoLogSplit, errorHandler, watermarkDispatcher, context)) { return; } super.processRow(partition, row); @@ -259,7 +268,7 @@ public class EventProcessorFactory { LogMinerEventRow row, StreamSplit redoLogSplit, ErrorHandler errorHandler, - JdbcSourceEventDispatcher dispatcher, + WatermarkDispatcher dispatcher, ChangeEventSource.ChangeEventSourceContext context) { // check do we need to stop for fetch redo log for snapshot split. if (isBoundedRead(redoLogSplit)) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index 0fa72c616..4ce09a1e1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.connectors.oracle.source.reader.fetch; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask; @@ -74,7 +73,7 @@ public class OracleScanFetchTask extends AbstractScanFetchTask { sourceFetchContext.getSnapshotChangeEventSourceMetrics(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getConnection(), - sourceFetchContext.getDispatcher(), + sourceFetchContext.getEventDispatcher(), snapshotSplit); StoppableChangeEventSourceContext changeEventSourceContext = new StoppableChangeEventSourceContext(); @@ -136,7 +135,8 @@ public class OracleScanFetchTask extends AbstractScanFetchTask { return new RedoLogSplitReadTask( new OracleConnectorConfig(dezConf), context.getConnection(), - context.getDispatcher(), + context.getEventDispatcher(), + context.getWaterMarkDispatcher(), context.getErrorHandler(), context.getDatabaseSchema(), context.getSourceConfig().getOriginDbzConnectorConfig(), @@ -157,7 +157,7 @@ public class OracleScanFetchTask extends AbstractScanFetchTask { private final OracleConnectorConfig connectorConfig; private final OracleDatabaseSchema databaseSchema; private final OracleConnection jdbcConnection; - private final JdbcSourceEventDispatcher dispatcher; + private final EventDispatcher eventDispatcher; private final Clock clock; private final SnapshotSplit snapshotSplit; private final OracleOffsetContext offsetContext; @@ -169,14 +169,14 @@ public class OracleScanFetchTask extends AbstractScanFetchTask { SnapshotProgressListener snapshotProgressListener, OracleDatabaseSchema databaseSchema, OracleConnection jdbcConnection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher eventDispatcher, SnapshotSplit snapshotSplit) { super(connectorConfig, snapshotProgressListener); this.offsetContext = previousOffset; this.connectorConfig = connectorConfig; this.databaseSchema = databaseSchema; this.jdbcConnection = jdbcConnection; - this.dispatcher = dispatcher; + this.eventDispatcher = eventDispatcher; this.clock = Clock.SYSTEM; this.snapshotSplit = snapshotSplit; this.snapshotProgressListener = snapshotProgressListener; @@ -243,7 +243,7 @@ public class OracleScanFetchTask extends AbstractScanFetchTask { private void createDataEvents(OracleSnapshotContext snapshotContext, TableId tableId) throws Exception { EventDispatcher.SnapshotReceiver snapshotReceiver = - dispatcher.getSnapshotChangeEventReceiver(); + eventDispatcher.getSnapshotChangeEventReceiver(); LOG.debug("Snapshotting table {}", tableId); createDataEventsForTable( snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId)); @@ -306,7 +306,7 @@ public class OracleScanFetchTask extends AbstractScanFetchTask { snapshotContext.partition, table.id(), rows); logTimer = getTableScanLogTimer(); } - dispatcher.dispatchSnapshotEvent( + eventDispatcher.dispatchSnapshotEvent( snapshotContext.partition, table.id(), getChangeRecordEmitter(snapshotContext, table.id(), row), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java index 9f13aee9a..c475ada56 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.oracle.source.reader.fetch; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; @@ -230,7 +231,12 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext { } @Override - public JdbcSourceEventDispatcher getDispatcher() { + public JdbcSourceEventDispatcher getEventDispatcher() { + return dispatcher; + } + + @Override + public WatermarkDispatcher getWaterMarkDispatcher() { return dispatcher; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java index 19ecd7254..95fbefdd8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java @@ -18,7 +18,7 @@ package org.apache.flink.cdc.connectors.oracle.source.reader.fetch; import org.apache.flink.cdc.common.annotation.Internal; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask; @@ -33,6 +33,8 @@ import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource; import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor; import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.relational.TableId; import io.debezium.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +58,8 @@ public class OracleStreamFetchTask implements FetchTask { new RedoLogSplitReadTask( sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getConnection(), - sourceFetchContext.getDispatcher(), + sourceFetchContext.getEventDispatcher(), + sourceFetchContext.getWaterMarkDispatcher(), sourceFetchContext.getErrorHandler(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getSourceConfig().getOriginDbzConnectorConfig(), @@ -93,9 +96,9 @@ public class OracleStreamFetchTask implements FetchTask { private static final Logger LOG = LoggerFactory.getLogger(RedoLogSplitReadTask.class); private final StreamSplit redoLogSplit; - private final JdbcSourceEventDispatcher dispatcher; + EventDispatcher eventDispatcher; + private final WatermarkDispatcher watermarkDispatcher; private final ErrorHandler errorHandler; - private ChangeEventSourceContext context; private final OracleConnectorConfig connectorConfig; private final OracleConnection connection; @@ -106,7 +109,8 @@ public class OracleStreamFetchTask implements FetchTask { public RedoLogSplitReadTask( OracleConnectorConfig connectorConfig, OracleConnection connection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher eventDispatcher, + WatermarkDispatcher watermarkDispatcher, ErrorHandler errorHandler, OracleDatabaseSchema schema, Configuration jdbcConfig, @@ -115,14 +119,15 @@ public class OracleStreamFetchTask implements FetchTask { super( connectorConfig, connection, - dispatcher, + eventDispatcher, errorHandler, Clock.SYSTEM, schema, jdbcConfig, metrics); this.redoLogSplit = redoLogSplit; - this.dispatcher = dispatcher; + this.eventDispatcher = eventDispatcher; + this.watermarkDispatcher = watermarkDispatcher; this.errorHandler = errorHandler; this.connectorConfig = connectorConfig; this.connection = connection; @@ -135,7 +140,6 @@ public class OracleStreamFetchTask implements FetchTask { ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext) { - this.context = context; super.execute(context, partition, offsetContext); } @@ -152,7 +156,8 @@ public class OracleStreamFetchTask implements FetchTask { context, connectorConfig, connection, - dispatcher, + eventDispatcher, + watermarkDispatcher, partition, offsetContext, schema, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java new file mode 100644 index 000000000..e2db160e1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.fetch; + +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; +import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; +import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent; +import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind; + +import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresEventDispatcher; +import io.debezium.heartbeat.HeartbeatFactory; +import io.debezium.pipeline.DataChangeEvent; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.pipeline.spi.ChangeEventCreator; +import io.debezium.relational.TableId; +import io.debezium.schema.DataCollectionFilters; +import io.debezium.schema.DatabaseSchema; +import io.debezium.schema.TopicSelector; +import io.debezium.util.SchemaNameAdjuster; +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.Map; + +/** Postgres Dispatcher for cdc source with watermark. */ +public class CDCPostgresDispatcher extends PostgresEventDispatcher + implements WatermarkDispatcher { + private final String topic; + private final ChangeEventQueue queue; + + public CDCPostgresDispatcher( + PostgresConnectorConfig connectorConfig, + TopicSelector topicSelector, + DatabaseSchema schema, + ChangeEventQueue queue, + DataCollectionFilters.DataCollectionFilter filter, + ChangeEventCreator changeEventCreator, + EventMetadataProvider metadataProvider, + HeartbeatFactory heartbeatFactory, + SchemaNameAdjuster schemaNameAdjuster) { + super( + connectorConfig, + topicSelector, + schema, + queue, + filter, + changeEventCreator, + metadataProvider, + heartbeatFactory, + schemaNameAdjuster); + this.topic = topicSelector.getPrimaryTopic(); + this.queue = queue; + } + + @Override + public void dispatchWatermarkEvent( + Map sourcePartition, + SourceSplitBase sourceSplit, + Offset watermark, + WatermarkKind watermarkKind) + throws InterruptedException { + SourceRecord sourceRecord = + WatermarkEvent.create( + sourcePartition, topic, sourceSplit.splitId(), watermarkKind, watermark); + queue.enqueue(new DataChangeEvent(sourceRecord)); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index 7487f9eda..548108f94 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.connectors.postgres.source.fetch; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask; @@ -28,6 +27,7 @@ import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils; import org.apache.flink.util.FlinkRuntimeException; import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresEventDispatcher; import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.PostgresPartition; import io.debezium.connector.postgresql.PostgresSchema; @@ -102,7 +102,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask { ctx.getDbzConnectorConfig(), ctx.getDatabaseSchema(), ctx.getOffsetContext(), - ctx.getDispatcher(), + ctx.getEventDispatcher(), ctx.getSnapshotChangeEventSourceMetrics(), snapshotSplit); @@ -135,8 +135,8 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask { ctx.getDbzConnectorConfig(), ctx.getSnapShotter(), ctx.getConnection(), - ctx.getDispatcher(), - ctx.getPostgresDispatcher(), + ctx.getEventDispatcher(), + ctx.getWaterMarkDispatcher(), ctx.getErrorHandler(), ctx.getTaskContext().getClock(), ctx.getDatabaseSchema(), @@ -215,7 +215,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask { private final PostgresConnection jdbcConnection; private final PostgresConnectorConfig connectorConfig; - private final JdbcSourceEventDispatcher dispatcher; + private final PostgresEventDispatcher eventDispatcher; private final SnapshotSplit snapshotSplit; private final PostgresOffsetContext offsetContext; private final PostgresSchema databaseSchema; @@ -227,7 +227,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask { PostgresConnectorConfig connectorConfig, PostgresSchema databaseSchema, PostgresOffsetContext previousOffset, - JdbcSourceEventDispatcher dispatcher, + PostgresEventDispatcher eventDispatcher, SnapshotProgressListener snapshotProgressListener, SnapshotSplit snapshotSplit) { super(connectorConfig, snapshotProgressListener); @@ -235,7 +235,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask { this.connectorConfig = connectorConfig; this.snapshotProgressListener = snapshotProgressListener; this.databaseSchema = databaseSchema; - this.dispatcher = dispatcher; + this.eventDispatcher = eventDispatcher; this.snapshotSplit = snapshotSplit; this.offsetContext = previousOffset; this.clock = Clock.SYSTEM; @@ -260,7 +260,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask { private void createDataEvents(PostgresSnapshotContext snapshotContext, TableId tableId) throws InterruptedException { EventDispatcher.SnapshotReceiver snapshotReceiver = - dispatcher.getSnapshotChangeEventReceiver(); + eventDispatcher.getSnapshotChangeEventReceiver(); LOG.info("Snapshotting table {}", tableId); createDataEventsForTable( snapshotContext, @@ -337,7 +337,7 @@ public class PostgresScanFetchTask extends AbstractScanFetchTask { SnapshotChangeRecordEmitter emitter = new SnapshotChangeRecordEmitter<>( snapshotContext.partition, snapshotContext.offset, row, clock); - dispatcher.dispatchSnapshotEvent( + eventDispatcher.dispatchSnapshotEvent( snapshotContext.partition, table.id(), emitter, snapshotReceiver); } LOG.info( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java index 13452cacb..68c0dec00 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java @@ -17,8 +17,8 @@ package org.apache.flink.cdc.connectors.postgres.source.fetch; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; @@ -26,13 +26,13 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext; import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; -import org.apache.flink.cdc.connectors.postgres.source.handler.PostgresSchemaChangeEventHandler; import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset; import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils; import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils; import org.apache.flink.table.types.logical.RowType; +import io.debezium.DebeziumException; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresErrorHandler; @@ -48,6 +48,7 @@ import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.data.Envelope; import io.debezium.heartbeat.Heartbeat; +import io.debezium.heartbeat.HeartbeatFactory; import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory; @@ -60,6 +61,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.schema.TopicSelector; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,8 +92,7 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { private PostgresPartition partition; private PostgresSchema schema; private ErrorHandler errorHandler; - private JdbcSourceEventDispatcher dispatcher; - private PostgresEventDispatcher postgresDispatcher; + private CDCPostgresDispatcher postgresDispatcher; private EventMetadataProvider metadataProvider; private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics; private Snapshotter snapShotter; @@ -215,27 +216,48 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { this.errorHandler = new PostgresErrorHandler(getDbzConnectorConfig(), queue); this.metadataProvider = PostgresObjectUtils.newEventMetadataProvider(); - this.dispatcher = - new JdbcSourceEventDispatcher<>( - dbzConfig, - topicSelector, - schema, - queue, - dbzConfig.getTableFilters().dataCollectionFilter(), - DataChangeEvent::new, - metadataProvider, - schemaNameAdjuster, - new PostgresSchemaChangeEventHandler()); + PostgresConnectorConfig finalDbzConfig = dbzConfig; this.postgresDispatcher = - new PostgresEventDispatcher<>( - dbzConfig, + new CDCPostgresDispatcher( + finalDbzConfig, topicSelector, schema, queue, - dbzConfig.getTableFilters().dataCollectionFilter(), + finalDbzConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, metadataProvider, + new HeartbeatFactory<>( + dbzConfig, + topicSelector, + schemaNameAdjuster, + () -> + new PostgresConnection( + finalDbzConfig.getJdbcConfig(), + PostgresConnection.CONNECTION_GENERAL), + exception -> { + String sqlErrorId = exception.getSQLState(); + switch (sqlErrorId) { + case "57P01": + // Postgres error admin_shutdown, see + // https://www.postgresql.org/docs/12/errcodes-appendix.html + throw new DebeziumException( + "Could not execute heartbeat action query (Error: " + + sqlErrorId + + ")", + exception); + case "57P03": + // Postgres error cannot_connect_now, see + // https://www.postgresql.org/docs/12/errcodes-appendix.html + throw new RetriableException( + "Could not execute heartbeat action query (Error: " + + sqlErrorId + + ")", + exception); + default: + break; + } + }), schemaNameAdjuster); ChangeEventSourceMetricsFactory metricsFactory = @@ -261,11 +283,12 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { } @Override - public JdbcSourceEventDispatcher getDispatcher() { - return dispatcher; + public PostgresEventDispatcher getEventDispatcher() { + return postgresDispatcher; } - public PostgresEventDispatcher getPostgresDispatcher() { + @Override + public WatermarkDispatcher getWaterMarkDispatcher() { return postgresDispatcher; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java index f1c76f959..b34db92b1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java @@ -17,7 +17,7 @@ package org.apache.flink.cdc.connectors.postgres.source.fetch; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; @@ -84,8 +84,8 @@ public class PostgresStreamFetchTask implements FetchTask { sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getSnapShotter(), sourceFetchContext.getConnection(), - sourceFetchContext.getDispatcher(), - sourceFetchContext.getPostgresDispatcher(), + sourceFetchContext.getEventDispatcher(), + sourceFetchContext.getWaterMarkDispatcher(), sourceFetchContext.getErrorHandler(), sourceFetchContext.getTaskContext().getClock(), sourceFetchContext.getDatabaseSchema(), @@ -166,7 +166,7 @@ public class PostgresStreamFetchTask implements FetchTask { public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource { private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class); private final StreamSplit streamSplit; - private final JdbcSourceEventDispatcher dispatcher; + private final WatermarkDispatcher watermarkDispatcher; private final ErrorHandler errorHandler; public ChangeEventSourceContext context; @@ -176,8 +176,8 @@ public class PostgresStreamFetchTask implements FetchTask { PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresConnection connection, - JdbcSourceEventDispatcher dispatcher, - PostgresEventDispatcher postgresEventDispatcher, + PostgresEventDispatcher eventDispatcher, + WatermarkDispatcher watermarkDispatcher, ErrorHandler errorHandler, Clock clock, PostgresSchema schema, @@ -189,14 +189,14 @@ public class PostgresStreamFetchTask implements FetchTask { connectorConfig, snapshotter, connection, - postgresEventDispatcher, + eventDispatcher, errorHandler, clock, schema, taskContext, replicationConnection); this.streamSplit = streamSplit; - this.dispatcher = dispatcher; + this.watermarkDispatcher = watermarkDispatcher; this.errorHandler = errorHandler; } @@ -218,7 +218,7 @@ public class PostgresStreamFetchTask implements FetchTask { LOG.debug("StreamSplit is bounded read: {}", streamSplit); final PostgresOffset currentOffset = PostgresOffset.of(offsetContext.getOffset()); try { - dispatcher.dispatchWatermarkEvent( + watermarkDispatcher.dispatchWatermarkEvent( partition.getSourcePartition(), streamSplit, currentOffset, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java index 8d1a6d975..7d2c6dde8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java @@ -44,6 +44,7 @@ import org.apache.flink.util.FlinkRuntimeException; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.TableId; import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; @@ -74,6 +75,7 @@ import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.catalog.Column.physical; import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** IT tests for {@link PostgresSourceBuilder.PostgresIncrementalSource}. */ @@ -253,9 +255,7 @@ public class PostgresSourceITCase extends PostgresTestBase { PostgresTestUtils.FailoverType.NONE, PostgresTestUtils.FailoverPhase.NEVER, new String[] {"customers_no_pk"}, - RestartStrategies.noRestart(), - false, - null); + RestartStrategies.noRestart()); } catch (Exception e) { assertTrue( ExceptionUtils.findThrowableWithMessage( @@ -272,9 +272,7 @@ public class PostgresSourceITCase extends PostgresTestBase { PostgresTestUtils.FailoverType.NONE, PostgresTestUtils.FailoverPhase.NEVER, new String[] {"customers_no_pk"}, - RestartStrategies.noRestart(), - false, - null); + RestartStrategies.noRestart()); } } @@ -287,8 +285,7 @@ public class PostgresSourceITCase extends PostgresTestBase { PostgresTestUtils.FailoverPhase.SNAPSHOT, new String[] {"customers"}, RestartStrategies.fixedDelayRestart(1, 0), - true, - null); + Collections.singletonMap("scan.incremental.snapshot.backfill.skip", "true")); } @Test @@ -660,8 +657,8 @@ public class PostgresSourceITCase extends PostgresTestBase { PostgresTestUtils.FailoverPhase.NEVER, new String[] {"customers"}, RestartStrategies.noRestart(), - false, - chunkColumn); + Collections.singletonMap( + "scan.incremental.snapshot.chunk.key-column", chunkColumn)); } catch (Exception e) { assertTrue( ExceptionUtils.findThrowableWithMessage( @@ -673,6 +670,34 @@ public class PostgresSourceITCase extends PostgresTestBase { } } + @Test + public void testHeartBeat() throws Exception { + try (PostgresConnection connection = getConnection()) { + connection.execute("CREATE TABLE IF NOT EXISTS heart_beat_table(a int)"); + connection.commit(); + } + + TableId tableId = new TableId(null, "public", "heart_beat_table"); + try (PostgresConnection connection = getConnection()) { + assertEquals(0, getCountOfTable(connection, tableId)); + } + + Map options = new HashMap<>(); + options.put("heartbeat.interval.ms", "100"); + options.put("debezium.heartbeat.action.query", "INSERT INTO heart_beat_table VALUES(1)"); + testPostgresParallelSource( + 1, + scanStartupMode, + PostgresTestUtils.FailoverType.NONE, + PostgresTestUtils.FailoverPhase.NEVER, + new String[] {"customers"}, + RestartStrategies.noRestart(), + options); + try (PostgresConnection connection = getConnection()) { + assertTrue(getCountOfTable(connection, tableId) > 0); + } + } + private List testBackfillWhenWritingEvents( boolean skipSnapshotBackfill, int fetchSize, @@ -776,8 +801,25 @@ public class PostgresSourceITCase extends PostgresTestBase { failoverPhase, captureCustomerTables, RestartStrategies.fixedDelayRestart(1, 0), - false, - null); + new HashMap<>()); + } + + private void testPostgresParallelSource( + int parallelism, + String scanStartupMode, + PostgresTestUtils.FailoverType failoverType, + PostgresTestUtils.FailoverPhase failoverPhase, + String[] captureCustomerTables, + RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) + throws Exception { + testPostgresParallelSource( + parallelism, + scanStartupMode, + failoverType, + failoverPhase, + captureCustomerTables, + restartStrategyConfiguration, + new HashMap<>()); } private void testPostgresParallelSource( @@ -787,8 +829,7 @@ public class PostgresSourceITCase extends PostgresTestBase { PostgresTestUtils.FailoverPhase failoverPhase, String[] captureCustomerTables, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, - boolean skipSnapshotBackfill, - String chunkColumn) + Map otherOptions) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -817,9 +858,8 @@ public class PostgresSourceITCase extends PostgresTestBase { + " 'scan.startup.mode' = '%s'," + " 'scan.incremental.snapshot.chunk.size' = '100'," + " 'slot.name' = '%s'," - + " 'scan.incremental.snapshot.backfill.skip' = '%s'," + " 'scan.lsn-commit.checkpoints-num-delay' = '1'" - + "" + + "%s" + ")", customDatabase.getHost(), customDatabase.getDatabasePort(), @@ -830,12 +870,16 @@ public class PostgresSourceITCase extends PostgresTestBase { getTableNameRegex(captureCustomerTables), scanStartupMode, slotName, - skipSnapshotBackfill, - chunkColumn == null + otherOptions.isEmpty() ? "" - : ",'scan.incremental.snapshot.chunk.key-column'='" - + chunkColumn - + "'"); + : "," + + otherOptions.entrySet().stream() + .map( + e -> + String.format( + "'%s'='%s'", + e.getKey(), e.getValue())) + .collect(Collectors.joining(","))); tEnv.executeSql(sourceDDL); TableResult tableResult = tEnv.executeSql("select * from customers"); @@ -1163,4 +1207,23 @@ public class PostgresSourceITCase extends PostgresTestBase { properties.put("dbname", customDatabase.getDatabaseName()); return createConnection(properties); } + + private static long getCountOfTable(JdbcConnection jdbc, TableId tableId) throws SQLException { + // The statement used to get approximate row count which is less + // accurate than COUNT(*), but is more efficient for large table. + // https://stackoverflow.com/questions/7943233/fast-way-to-discover-the-row-count-of-a-table-in-postgresql + // NOTE: it requires ANALYZE or VACUUM to be run first in PostgreSQL. + final String query = String.format("SELECT COUNT(1) FROM %s", tableId.toString()); + + return jdbc.queryAndMap( + query, + rs -> { + if (!rs.next()) { + throw new SQLException( + String.format( + "No result returned after running query [%s]", query)); + } + return rs.getLong(1); + }); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java index 15cf97a4c..a2c266cb2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask; @@ -75,7 +74,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask { sourceFetchContext.getSnapshotChangeEventSourceMetrics(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getConnection(), - sourceFetchContext.getDispatcher(), + sourceFetchContext.getEventDispatcher(), sourceFetchContext.getSnapshotReceiver(), snapshotSplit); SqlServerSnapshotSplitChangeEventSourceContext changeEventSourceContext = @@ -134,7 +133,8 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask { new SqlServerConnectorConfig(dezConf), context.getConnection(), context.getMetaDataConnection(), - context.getDispatcher(), + context.getEventDispatcher(), + context.getWaterMarkDispatcher(), context.getErrorHandler(), context.getDatabaseSchema(), backfillBinlogSplit); @@ -153,7 +153,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask { private final SqlServerConnectorConfig connectorConfig; private final SqlServerDatabaseSchema databaseSchema; private final SqlServerConnection jdbcConnection; - private final JdbcSourceEventDispatcher dispatcher; + private final EventDispatcher eventDispatcher; private final Clock clock; private final SnapshotSplit snapshotSplit; private final SqlServerOffsetContext offsetContext; @@ -166,7 +166,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask { SnapshotProgressListener snapshotProgressListener, SqlServerDatabaseSchema databaseSchema, SqlServerConnection jdbcConnection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher eventDispatcher, EventDispatcher.SnapshotReceiver snapshotReceiver, SnapshotSplit snapshotSplit) { super(connectorConfig, snapshotProgressListener); @@ -174,7 +174,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask { this.connectorConfig = connectorConfig; this.databaseSchema = databaseSchema; this.jdbcConnection = jdbcConnection; - this.dispatcher = dispatcher; + this.eventDispatcher = eventDispatcher; this.clock = Clock.SYSTEM; this.snapshotSplit = snapshotSplit; this.snapshotProgressListener = snapshotProgressListener; @@ -296,7 +296,7 @@ public class SqlServerScanFetchTask extends AbstractScanFetchTask { snapshotContext.partition, table.id(), rows); logTimer = getTableScanLogTimer(); } - dispatcher.dispatchSnapshotEvent( + eventDispatcher.dispatchSnapshotEvent( snapshotContext.partition, table.id(), getChangeRecordEmitter(snapshotContext, table.id(), row), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java index 55e7944f6..07390f460 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory; @@ -229,7 +230,12 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext } @Override - public JdbcSourceEventDispatcher getDispatcher() { + public JdbcSourceEventDispatcher getEventDispatcher() { + return dispatcher; + } + + @Override + public WatermarkDispatcher getWaterMarkDispatcher() { return dispatcher; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java index 922ea1e10..5eb4cb9a0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java @@ -17,7 +17,7 @@ package org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch; -import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher; +import org.apache.flink.cdc.connectors.base.WatermarkDispatcher; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; @@ -35,7 +35,9 @@ import io.debezium.connector.sqlserver.SqlServerOffsetContext; import io.debezium.connector.sqlserver.SqlServerPartition; import io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource; import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource; +import io.debezium.relational.TableId; import io.debezium.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +65,8 @@ public class SqlServerStreamFetchTask implements FetchTask { sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getConnection(), sourceFetchContext.getMetaDataConnection(), - sourceFetchContext.getDispatcher(), + sourceFetchContext.getEventDispatcher(), + sourceFetchContext.getWaterMarkDispatcher(), sourceFetchContext.getErrorHandler(), sourceFetchContext.getDatabaseSchema(), split); @@ -98,7 +101,7 @@ public class SqlServerStreamFetchTask implements FetchTask { private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class); private final StreamSplit lsnSplit; - private final JdbcSourceEventDispatcher dispatcher; + private final WatermarkDispatcher watermarkDispatcher; private final ErrorHandler errorHandler; private ChangeEventSourceContext context; @@ -106,7 +109,8 @@ public class SqlServerStreamFetchTask implements FetchTask { SqlServerConnectorConfig connectorConfig, SqlServerConnection connection, SqlServerConnection metadataConnection, - JdbcSourceEventDispatcher dispatcher, + EventDispatcher eventDispatcher, + WatermarkDispatcher watermarkDispatcher, ErrorHandler errorHandler, SqlServerDatabaseSchema schema, StreamSplit lsnSplit) { @@ -114,12 +118,12 @@ public class SqlServerStreamFetchTask implements FetchTask { connectorConfig, connection, metadataConnection, - dispatcher, + eventDispatcher, errorHandler, Clock.system(), schema); this.lsnSplit = lsnSplit; - this.dispatcher = dispatcher; + this.watermarkDispatcher = watermarkDispatcher; this.errorHandler = errorHandler; } @@ -132,7 +136,7 @@ public class SqlServerStreamFetchTask implements FetchTask { if (currentLsnOffset.isAtOrAfter(endingOffset)) { // send streaming end event try { - dispatcher.dispatchWatermarkEvent( + watermarkDispatcher.dispatchWatermarkEvent( partition.getSourcePartition(), lsnSplit, currentLsnOffset,