[postgres] Prepare a slot for the unique global stream split

Co-authored-by: Leonard Xu <xbjtdcq@gmail.com>
pull/2205/head
Hang Ruan 2 years ago
parent debd6ef404
commit da5e6a7872

@ -35,10 +35,16 @@ import com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTas
import com.ververica.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
import com.ververica.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresObjectUtils;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresTopicSelector;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.schema.TopicSelector;
import javax.annotation.Nullable;
@ -47,6 +53,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection;
import static io.debezium.connector.postgresql.PostgresObjectUtils.newPostgresValueConverterBuilder;
import static io.debezium.connector.postgresql.Utils.currentOffset;
@ -87,6 +94,31 @@ public class PostgresDialect implements JdbcDataSourceDialect {
return jdbc;
}
public PostgresReplicationConnection openPostgresReplicationConnection() {
try {
PostgresConnection jdbcConnection =
(PostgresConnection) openJdbcConnection(sourceConfig);
PostgresConnectorConfig pgConnectorConfig = sourceConfig.getDbzConnectorConfig();
TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(pgConnectorConfig);
PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder =
newPostgresValueConverterBuilder(pgConnectorConfig);
PostgresSchema schema =
PostgresObjectUtils.newSchema(
jdbcConnection,
pgConnectorConfig,
jdbcConnection.getTypeRegistry(),
topicSelector,
valueConverterBuilder.build(jdbcConnection.getTypeRegistry()));
PostgresTaskContext taskContext =
PostgresObjectUtils.newTaskContext(pgConnectorConfig, schema, topicSelector);
return (PostgresReplicationConnection)
createReplicationConnection(
taskContext, jdbcConnection, false, pgConnectorConfig);
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize PostgresReplicationConnection", e);
}
}
@Override
public String getName() {
return "PostgreSQL";

@ -17,14 +17,27 @@
package com.ververica.cdc.connectors.postgres.source;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.base.options.StartupMode;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner;
import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner;
import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner;
import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import com.ververica.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.relational.TableId;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -240,6 +253,38 @@ public class PostgresSourceBuilder<T> {
super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect);
}
@Override
public SplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(
SplitEnumeratorContext<SourceSplitBase> enumContext) {
final SplitAssigner splitAssigner;
PostgresSourceConfig sourceConfig = (PostgresSourceConfig) configFactory.create(0);
if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {
try {
final List<TableId> remainingTables =
dataSourceDialect.discoverDataCollections(sourceConfig);
boolean isTableIdCaseSensitive =
dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
splitAssigner =
new HybridSplitAssigner<>(
sourceConfig,
enumContext.currentParallelism(),
remainingTables,
isTableIdCaseSensitive,
dataSourceDialect,
offsetFactory);
} catch (Exception e) {
throw new FlinkRuntimeException(
"Failed to discover captured tables for enumerator", e);
}
} else {
splitAssigner =
new StreamSplitAssigner(sourceConfig, dataSourceDialect, offsetFactory);
}
return new PostgresSourceEnumerator(
enumContext, sourceConfig, splitAssigner, (PostgresDialect) dataSourceDialect);
}
public static <T> PostgresSourceBuilder<T> builder() {
return new PostgresSourceBuilder<>();
}

@ -34,6 +34,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/** Factory to create Configuration for Postgres source. */
public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
private static final long serialVersionUID = 1L;
private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue();
private static final String JDBC_DRIVER = "org.postgresql.Driver";
@ -63,6 +65,9 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
props.setProperty("database.user", checkNotNull(username));
props.setProperty("database.password", checkNotNull(password));
props.setProperty("database.port", String.valueOf(port));
// we will create different slot name for each snapshot reader during backfiil task
// execution, the original slot name will be used by enumerator to create slot for
// global stream split
props.setProperty("slot.name", checkNotNull(slotName));
// database history
props.setProperty(
@ -72,6 +77,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
props.setProperty("database.history.refer.ddl", String.valueOf(true));
// we have to enable heartbeat for PG to make sure DebeziumChangeConsumer#handleBatch
// is invoked after job restart
// Enable TCP keep-alive probe to verify that the database connection is still alive
props.setProperty("database.tcpKeepAlive", String.valueOf(true));
props.setProperty("heartbeat.interval.ms", String.valueOf(heartbeatInterval.toMillis()));
props.setProperty("include.schema.changes", String.valueOf(includeSchemaChanges));
@ -88,6 +95,10 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
props.putAll(dbzProperties);
}
// The PostgresSource will do snapshot according to its StartupMode.
// Do not need debezium to do the snapshot work.
props.put("snapshot.mode", "never");
Configuration dbzConfiguration = Configuration.from(props);
return new PostgresSourceConfig(
subtaskId,

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.source.enumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner;
import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
/**
* The Postgres source enumerator that enumerates receive the split request and assign the split to
* source readers.
*/
public class PostgresSourceEnumerator extends IncrementalSourceEnumerator {
private final PostgresDialect postgresDialect;
public PostgresSourceEnumerator(
SplitEnumeratorContext<SourceSplitBase> context,
PostgresSourceConfig sourceConfig,
SplitAssigner splitAssigner,
PostgresDialect postgresDialect) {
super(context, sourceConfig, splitAssigner);
this.postgresDialect = postgresDialect;
}
@Override
public void start() {
createSlotForGlobalStreamSplit();
super.start();
}
/**
* Create slot for the unique global stream split.
*
* <p>Currently all startup modes need read the stream split. We need open the slot before
* reading the globalStreamSplit to catch all data changes.
*/
private void createSlotForGlobalStreamSplit() {
try {
PostgresReplicationConnection replicationConnection =
postgresDialect.openPostgresReplicationConnection();
replicationConnection.createReplicationSlot();
replicationConnection.close(false);
} catch (Throwable t) {
throw new FlinkRuntimeException(
"Create Slot For Global Stream Split failed due to ", t);
}
}
}
Loading…
Cancel
Save