diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java index fcac38807..449a5e9ff 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java @@ -35,10 +35,16 @@ import com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTas import com.ververica.cdc.connectors.postgres.source.utils.CustomPostgresSchema; import com.ververica.cdc.connectors.postgres.source.utils.TableDiscoveryUtils; import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresObjectUtils; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.PostgresTaskContext; +import io.debezium.connector.postgresql.PostgresTopicSelector; import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges.TableChange; +import io.debezium.schema.TopicSelector; import javax.annotation.Nullable; @@ -47,6 +53,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection; import static io.debezium.connector.postgresql.PostgresObjectUtils.newPostgresValueConverterBuilder; import static io.debezium.connector.postgresql.Utils.currentOffset; @@ -87,6 +94,31 @@ public class PostgresDialect implements JdbcDataSourceDialect { return jdbc; } + public PostgresReplicationConnection openPostgresReplicationConnection() { + try { + PostgresConnection jdbcConnection = + (PostgresConnection) openJdbcConnection(sourceConfig); + PostgresConnectorConfig pgConnectorConfig = sourceConfig.getDbzConnectorConfig(); + TopicSelector topicSelector = PostgresTopicSelector.create(pgConnectorConfig); + PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder = + newPostgresValueConverterBuilder(pgConnectorConfig); + PostgresSchema schema = + PostgresObjectUtils.newSchema( + jdbcConnection, + pgConnectorConfig, + jdbcConnection.getTypeRegistry(), + topicSelector, + valueConverterBuilder.build(jdbcConnection.getTypeRegistry())); + PostgresTaskContext taskContext = + PostgresObjectUtils.newTaskContext(pgConnectorConfig, schema, topicSelector); + return (PostgresReplicationConnection) + createReplicationConnection( + taskContext, jdbcConnection, false, pgConnectorConfig); + } catch (SQLException e) { + throw new RuntimeException("Failed to initialize PostgresReplicationConnection", e); + } + } + @Override public String getName() { return "PostgreSQL"; diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java index e6bb09275..1d7f201ea 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -17,14 +17,27 @@ package com.ververica.cdc.connectors.postgres.source; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.util.FlinkRuntimeException; +import com.ververica.cdc.connectors.base.options.StartupMode; import com.ververica.cdc.connectors.base.options.StartupOptions; +import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner; +import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner; +import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner; +import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState; import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig; import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; +import com.ververica.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator; import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import io.debezium.relational.TableId; import java.time.Duration; +import java.util.List; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -240,6 +253,38 @@ public class PostgresSourceBuilder { super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect); } + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { + final SplitAssigner splitAssigner; + PostgresSourceConfig sourceConfig = (PostgresSourceConfig) configFactory.create(0); + if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) { + try { + final List remainingTables = + dataSourceDialect.discoverDataCollections(sourceConfig); + boolean isTableIdCaseSensitive = + dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig); + splitAssigner = + new HybridSplitAssigner<>( + sourceConfig, + enumContext.currentParallelism(), + remainingTables, + isTableIdCaseSensitive, + dataSourceDialect, + offsetFactory); + } catch (Exception e) { + throw new FlinkRuntimeException( + "Failed to discover captured tables for enumerator", e); + } + } else { + splitAssigner = + new StreamSplitAssigner(sourceConfig, dataSourceDialect, offsetFactory); + } + + return new PostgresSourceEnumerator( + enumContext, sourceConfig, splitAssigner, (PostgresDialect) dataSourceDialect); + } + public static PostgresSourceBuilder builder() { return new PostgresSourceBuilder<>(); } diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 07530d515..d4def8877 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -34,6 +34,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** Factory to create Configuration for Postgres source. */ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { + private static final long serialVersionUID = 1L; + private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue(); private static final String JDBC_DRIVER = "org.postgresql.Driver"; @@ -63,6 +65,9 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { props.setProperty("database.user", checkNotNull(username)); props.setProperty("database.password", checkNotNull(password)); props.setProperty("database.port", String.valueOf(port)); + // we will create different slot name for each snapshot reader during backfiil task + // execution, the original slot name will be used by enumerator to create slot for + // global stream split props.setProperty("slot.name", checkNotNull(slotName)); // database history props.setProperty( @@ -72,6 +77,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { props.setProperty("database.history.refer.ddl", String.valueOf(true)); // we have to enable heartbeat for PG to make sure DebeziumChangeConsumer#handleBatch // is invoked after job restart + // Enable TCP keep-alive probe to verify that the database connection is still alive + props.setProperty("database.tcpKeepAlive", String.valueOf(true)); props.setProperty("heartbeat.interval.ms", String.valueOf(heartbeatInterval.toMillis())); props.setProperty("include.schema.changes", String.valueOf(includeSchemaChanges)); @@ -88,6 +95,10 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { props.putAll(dbzProperties); } + // The PostgresSource will do snapshot according to its StartupMode. + // Do not need debezium to do the snapshot work. + props.put("snapshot.mode", "never"); + Configuration dbzConfiguration = Configuration.from(props); return new PostgresSourceConfig( subtaskId, diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java new file mode 100644 index 000000000..9b1774ae6 --- /dev/null +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.postgres.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.util.FlinkRuntimeException; + +import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner; +import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.postgres.source.PostgresDialect; +import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; + +/** + * The Postgres source enumerator that enumerates receive the split request and assign the split to + * source readers. + */ +public class PostgresSourceEnumerator extends IncrementalSourceEnumerator { + + private final PostgresDialect postgresDialect; + + public PostgresSourceEnumerator( + SplitEnumeratorContext context, + PostgresSourceConfig sourceConfig, + SplitAssigner splitAssigner, + PostgresDialect postgresDialect) { + super(context, sourceConfig, splitAssigner); + this.postgresDialect = postgresDialect; + } + + @Override + public void start() { + createSlotForGlobalStreamSplit(); + super.start(); + } + + /** + * Create slot for the unique global stream split. + * + *

Currently all startup modes need read the stream split. We need open the slot before + * reading the globalStreamSplit to catch all data changes. + */ + private void createSlotForGlobalStreamSplit() { + try { + PostgresReplicationConnection replicationConnection = + postgresDialect.openPostgresReplicationConnection(); + replicationConnection.createReplicationSlot(); + replicationConnection.close(false); + } catch (Throwable t) { + throw new FlinkRuntimeException( + "Create Slot For Global Stream Split failed due to ", t); + } + } +}