[postgres] Create slot for backfill task before snapshot reading

Co-authored-by: Leonard Xu <xbjtdcq@gmail.com>
pull/2205/head
Hang Ruan 2 years ago
parent da5e6a7872
commit 1a69cef424

@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.postgres.source.fetch;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
@ -34,6 +35,8 @@ import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
@ -57,6 +60,9 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Objects;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
import static io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady;
import static io.debezium.connector.postgresql.Utils.currentOffset;
import static io.debezium.connector.postgresql.Utils.refreshSchema;
@ -96,12 +102,15 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
PostgresSnapshotSplitReadTask snapshotSplitReadTask =
new PostgresSnapshotSplitReadTask(
ctx.getConnection(),
(PostgresReplicationConnection) ctx.getReplicationConnection(),
ctx.getDbzConnectorConfig(),
ctx.getDatabaseSchema(),
ctx.getOffsetContext(),
ctx.getDispatcher(),
ctx.getSnapshotChangeEventSourceMetrics(),
split);
split,
ctx.getSlotName(),
ctx.getPluginName());
SnapshotSplitChangeEventSourceContext changeEventSourceContext =
new SnapshotSplitChangeEventSourceContext();
@ -132,9 +141,8 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
0);
// optimization that skip the WAL read when the low watermark >= high watermark
final boolean backfillRequired =
backfillSplit.getEndingOffset().isAfter(backfillSplit.getStartingOffset());
if (!backfillRequired) {
if (!isBackFillRequired(
backfillSplit.getStartingOffset(), backfillSplit.getEndingOffset())) {
LOG.info(
"Skip the backfill {} for split {}: low watermark >= high watermark",
backfillSplit,
@ -158,17 +166,15 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
// we should only capture events for the current table,
// otherwise, we may not find corresponding schema
PostgresSourceConfig pgSourceConfig = (PostgresSourceConfig) ctx.getSourceConfig();
PostgresSourceConfig config = (PostgresSourceConfig) ctx.getSourceConfig();
Configuration dbzConf =
ctx.getDbzConnectorConfig()
.getConfig()
.edit()
.with("table.include.list", split.getTableId().toString())
.with(
"slot.name",
pgSourceConfig.getDbzProperties().getProperty("slot.name")
+ "_"
+ pgSourceConfig.getSubtaskId())
.with(SLOT_NAME.name(), config.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build();
@ -186,12 +192,18 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
ctx.getTaskContext(),
ctx.getReplicationConnection(),
backfillSplit);
LOG.info("Execute backfillReadTask for split {}", split);
LOG.info("Slot name {}", dbzConf.getString("slot.name"));
LOG.info(
"Execute backfillReadTask for split {} with slot name {}",
split,
dbzConf.getString(SLOT_NAME.name()));
backfillReadTask.execute(
new PostgresChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext);
}
private static boolean isBackFillRequired(Offset lowWatermark, Offset highWatermark) {
return highWatermark.isAfter(lowWatermark);
}
static class SnapshotSplitChangeEventSourceContext
implements ChangeEventSource.ChangeEventSourceContext {
@ -239,6 +251,7 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
private final PostgresConnection jdbcConnection;
private final PostgresReplicationConnection replicationConnection;
private final PostgresConnectorConfig connectorConfig;
private final JdbcSourceEventDispatcher<PostgresPartition> dispatcher;
private final SnapshotSplit snapshotSplit;
@ -246,17 +259,23 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
private final PostgresSchema databaseSchema;
private final SnapshotProgressListener<PostgresPartition> snapshotProgressListener;
private final Clock clock;
private final String slotName;
private final String pluginName;
public PostgresSnapshotSplitReadTask(
PostgresConnection jdbcConnection,
PostgresReplicationConnection replicationConnection,
PostgresConnectorConfig connectorConfig,
PostgresSchema databaseSchema,
PostgresOffsetContext previousOffset,
JdbcSourceEventDispatcher<PostgresPartition> dispatcher,
SnapshotProgressListener<PostgresPartition> snapshotProgressListener,
SnapshotSplit snapshotSplit) {
JdbcSourceEventDispatcher dispatcher,
SnapshotProgressListener snapshotProgressListener,
SnapshotSplit snapshotSplit,
String slotName,
String pluginName) {
super(connectorConfig, snapshotProgressListener);
this.jdbcConnection = jdbcConnection;
this.replicationConnection = replicationConnection;
this.connectorConfig = connectorConfig;
this.snapshotProgressListener = snapshotProgressListener;
this.databaseSchema = databaseSchema;
@ -264,6 +283,8 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
this.snapshotSplit = snapshotSplit;
this.offsetContext = previousOffset;
this.clock = Clock.SYSTEM;
this.slotName = slotName;
this.pluginName = pluginName;
}
@Override
@ -275,8 +296,8 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
throws Exception {
final PostgresSnapshotContext ctx = (PostgresSnapshotContext) snapshotContext;
ctx.offset = offsetContext;
refreshSchema(databaseSchema, jdbcConnection, false);
createSlotForBackFillReadTask();
refreshSchema(databaseSchema, jdbcConnection, true);
final PostgresOffset lowWatermark = currentOffset(jdbcConnection);
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
@ -303,6 +324,10 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
snapshotSplit,
highWatermark,
WatermarkKind.HIGH);
// release slot timely
if (!isBackFillRequired(lowWatermark, highWatermark)) {
dropSlotForBackFillReadTask();
}
return SnapshotResult.completed(ctx.offset);
}
@ -394,6 +419,46 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
}
}
/**
* Create a slot before snapshot reading so that the slot can track the WAL log during the
* snapshot reading phase.
*/
private void createSlotForBackFillReadTask() {
try {
SlotState slotInfo = null;
try {
slotInfo = jdbcConnection.getReplicationSlotState(slotName, pluginName);
} catch (SQLException e) {
LOG.info(
"Unable to load info of replication slot, will try to create the slot");
}
if (slotInfo == null) {
try {
replicationConnection.createReplicationSlot().orElse(null);
} catch (SQLException ex) {
String message = "Creation of replication slot failed";
if (ex.getMessage().contains("already exists")) {
message +=
"; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
}
throw new FlinkRuntimeException(message, ex);
}
}
waitForReplicationSlotReady(30, jdbcConnection, slotName, pluginName);
} catch (Throwable t) {
throw new FlinkRuntimeException(t);
}
}
/** Drop slot for backfill task and close replication connection. */
private void dropSlotForBackFillReadTask() {
try {
replicationConnection.close(true);
} catch (Throwable t) {
throw new FlinkRuntimeException(t);
}
}
private Threads.Timer getTableScanLogTimer() {
return Threads.timer(clock, LOG_INTERVAL);
}

@ -23,8 +23,10 @@ import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
@ -43,6 +45,7 @@ import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
@ -64,6 +67,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.PLUGIN_NAME;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SNAPSHOT_MODE;
import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection;
import static io.debezium.connector.postgresql.PostgresObjectUtils.newPostgresValueConverterBuilder;
@ -158,7 +164,21 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
this.taskContext,
jdbcConnection,
this.snapShotter.shouldSnapshot(),
dbzConfig));
sourceSplitBase instanceof StreamSplit
? dbzConfig
: new PostgresConnectorConfig(
dbzConfig
.getConfig()
.edit()
.with(
SLOT_NAME.name(),
((PostgresSourceConfig) sourceConfig)
.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build())));
this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
@ -288,4 +308,14 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
public Snapshotter getSnapShotter() {
return snapShotter;
}
public String getSlotName() {
return sourceConfig.getDbzProperties().getProperty(SLOT_NAME.name());
}
public String getPluginName() {
return PostgresConnectorConfig.LogicalDecoder.parse(
sourceConfig.getDbzProperties().getProperty(PLUGIN_NAME.name()))
.getPostgresPluginName();
}
}

@ -210,27 +210,25 @@ public class PostgresStreamFetchTask implements FetchTask<SourceSplitBase> {
((PostgresOffset) streamSplit.getEndingOffset()).getLsn());
super.execute(context, partition, offsetContext);
if (isBoundedRead()) {
LOG.debug("StreamSplit is bounded read: {}", streamSplit);
final PostgresOffset currentOffset = PostgresOffset.of(offsetContext.getOffset());
if (currentOffset.isAtOrAfter(streamSplit.getEndingOffset())) {
LOG.info("StreamSplitReadTask finished for {}", streamSplit);
try {
dispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
streamSplit,
currentOffset,
WatermarkKind.END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new FlinkRuntimeException("Error processing WAL signal event", e));
}
((PostgresScanFetchTask.PostgresChangeEventSourceContext) context).finished();
try {
dispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
streamSplit,
currentOffset,
WatermarkKind.END);
LOG.info(
"StreamSplitReadTask finished for {} at {}",
streamSplit,
currentOffset);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new FlinkRuntimeException("Error processing WAL signal event", e));
}
((PostgresScanFetchTask.PostgresChangeEventSourceContext) context).finished();
}
}

@ -37,7 +37,7 @@ public class PostgresOffset extends Offset {
public static final PostgresOffset INITIAL_OFFSET =
new PostgresOffset(Lsn.INVALID_LSN.asLong(), null, Instant.MIN);
public static final PostgresOffset NO_STOPPING_OFFSET =
new PostgresOffset(Lsn.valueOf("FFFFFFFF/FFFFFFFF").asLong(), null, Instant.MAX);
new PostgresOffset(Lsn.NO_STOPPING_LSN.asLong(), null, Instant.MAX);
// used by PostgresOffsetFactory
PostgresOffset(Map<String, String> offset) {

@ -20,6 +20,7 @@ import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
@ -132,4 +133,27 @@ public class PostgresObjectUtils {
throw new FlinkRuntimeException(
"Failed to create replication connection for " + taskContext);
}
public static void waitForReplicationSlotReady(
int retryTimes, PostgresConnection jdbcConnection, String slotName, String pluginName)
throws SQLException {
int count = 0;
SlotState slotState = jdbcConnection.getReplicationSlotState(slotName, pluginName);
while (slotState == null && count < retryTimes) {
LOGGER.info("Waiting until the replication slot is ready ...");
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
// do nothing
}
count++;
slotState = jdbcConnection.getReplicationSlotState(slotName, pluginName);
}
if (slotState == null) {
throw new IllegalStateException(
String.format(
"The replication slot is not ready after %d seconds.", 2 * retryTimes));
}
}
}

@ -15,6 +15,8 @@ import java.nio.ByteBuffer;
* current Debezium release java version is 11, so we need to compile this file by java 8 compiler.
* <a href="https://www.morling.dev/blog/bytebuffer-and-the-dreaded-nosuchmethoderror/">More
* info</a>. Abstraction of PostgreSQL log sequence number, adapted from {@link LogSequenceNumber}.
*
* <p>Line 32: add NO_STOPPING_LSN
*/
public class Lsn implements Comparable<Lsn> {
@ -24,6 +26,9 @@ public class Lsn implements Comparable<Lsn> {
*/
public static final Lsn INVALID_LSN = new Lsn(0);
/** The max lsn for the wal file. */
public static final Lsn NO_STOPPING_LSN = Lsn.valueOf("FFFFFFFF/FFFFFFFF");
private final long value;
private Lsn(long value) {
@ -79,7 +84,7 @@ public class Lsn implements Comparable<Lsn> {
final ByteBuffer buf = ByteBuffer.allocate(8);
buf.putInt(logicalXlog);
buf.putInt(segment);
buf.position(0);
((java.nio.Buffer) buf).position(0);
final long value = buf.getLong();
return Lsn.valueOf(value);
@ -103,7 +108,7 @@ public class Lsn implements Comparable<Lsn> {
public String asString() {
final ByteBuffer buf = ByteBuffer.allocate(8);
buf.putLong(value);
buf.position(0);
((java.nio.Buffer) buf).position(0);
final int logicalXlog = buf.getInt();
final int segment = buf.getInt();
@ -133,6 +138,10 @@ public class Lsn implements Comparable<Lsn> {
return this != INVALID_LSN;
}
public boolean isNonStopping() {
return this == NO_STOPPING_LSN;
}
@Override
public String toString() {
return "LSN{" + asString() + '}';

Loading…
Cancel
Save