diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index 0ae6aa466..87a429a7d 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.postgres.source.fetch; import org.apache.flink.util.FlinkRuntimeException; import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher; +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; @@ -34,6 +35,8 @@ import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.PostgresPartition; import io.debezium.connector.postgresql.PostgresSchema; import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; +import io.debezium.connector.postgresql.spi.SlotState; import io.debezium.heartbeat.Heartbeat; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; @@ -57,6 +60,9 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Objects; +import static io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP; +import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME; +import static io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady; import static io.debezium.connector.postgresql.Utils.currentOffset; import static io.debezium.connector.postgresql.Utils.refreshSchema; @@ -96,12 +102,15 @@ public class PostgresScanFetchTask implements FetchTask { PostgresSnapshotSplitReadTask snapshotSplitReadTask = new PostgresSnapshotSplitReadTask( ctx.getConnection(), + (PostgresReplicationConnection) ctx.getReplicationConnection(), ctx.getDbzConnectorConfig(), ctx.getDatabaseSchema(), ctx.getOffsetContext(), ctx.getDispatcher(), ctx.getSnapshotChangeEventSourceMetrics(), - split); + split, + ctx.getSlotName(), + ctx.getPluginName()); SnapshotSplitChangeEventSourceContext changeEventSourceContext = new SnapshotSplitChangeEventSourceContext(); @@ -132,9 +141,8 @@ public class PostgresScanFetchTask implements FetchTask { 0); // optimization that skip the WAL read when the low watermark >= high watermark - final boolean backfillRequired = - backfillSplit.getEndingOffset().isAfter(backfillSplit.getStartingOffset()); - if (!backfillRequired) { + if (!isBackFillRequired( + backfillSplit.getStartingOffset(), backfillSplit.getEndingOffset())) { LOG.info( "Skip the backfill {} for split {}: low watermark >= high watermark", backfillSplit, @@ -158,17 +166,15 @@ public class PostgresScanFetchTask implements FetchTask { // we should only capture events for the current table, // otherwise, we may not find corresponding schema - PostgresSourceConfig pgSourceConfig = (PostgresSourceConfig) ctx.getSourceConfig(); + PostgresSourceConfig config = (PostgresSourceConfig) ctx.getSourceConfig(); Configuration dbzConf = ctx.getDbzConnectorConfig() .getConfig() .edit() .with("table.include.list", split.getTableId().toString()) - .with( - "slot.name", - pgSourceConfig.getDbzProperties().getProperty("slot.name") - + "_" - + pgSourceConfig.getSubtaskId()) + .with(SLOT_NAME.name(), config.getSlotNameForBackfillTask()) + // drop slot for backfill stream split + .with(DROP_SLOT_ON_STOP.name(), true) // Disable heartbeat event in snapshot split fetcher .with(Heartbeat.HEARTBEAT_INTERVAL, 0) .build(); @@ -186,12 +192,18 @@ public class PostgresScanFetchTask implements FetchTask { ctx.getTaskContext(), ctx.getReplicationConnection(), backfillSplit); - LOG.info("Execute backfillReadTask for split {}", split); - LOG.info("Slot name {}", dbzConf.getString("slot.name")); + LOG.info( + "Execute backfillReadTask for split {} with slot name {}", + split, + dbzConf.getString(SLOT_NAME.name())); backfillReadTask.execute( new PostgresChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext); } + private static boolean isBackFillRequired(Offset lowWatermark, Offset highWatermark) { + return highWatermark.isAfter(lowWatermark); + } + static class SnapshotSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext { @@ -239,6 +251,7 @@ public class PostgresScanFetchTask implements FetchTask { LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class); private final PostgresConnection jdbcConnection; + private final PostgresReplicationConnection replicationConnection; private final PostgresConnectorConfig connectorConfig; private final JdbcSourceEventDispatcher dispatcher; private final SnapshotSplit snapshotSplit; @@ -246,17 +259,23 @@ public class PostgresScanFetchTask implements FetchTask { private final PostgresSchema databaseSchema; private final SnapshotProgressListener snapshotProgressListener; private final Clock clock; + private final String slotName; + private final String pluginName; public PostgresSnapshotSplitReadTask( PostgresConnection jdbcConnection, + PostgresReplicationConnection replicationConnection, PostgresConnectorConfig connectorConfig, PostgresSchema databaseSchema, PostgresOffsetContext previousOffset, - JdbcSourceEventDispatcher dispatcher, - SnapshotProgressListener snapshotProgressListener, - SnapshotSplit snapshotSplit) { + JdbcSourceEventDispatcher dispatcher, + SnapshotProgressListener snapshotProgressListener, + SnapshotSplit snapshotSplit, + String slotName, + String pluginName) { super(connectorConfig, snapshotProgressListener); this.jdbcConnection = jdbcConnection; + this.replicationConnection = replicationConnection; this.connectorConfig = connectorConfig; this.snapshotProgressListener = snapshotProgressListener; this.databaseSchema = databaseSchema; @@ -264,6 +283,8 @@ public class PostgresScanFetchTask implements FetchTask { this.snapshotSplit = snapshotSplit; this.offsetContext = previousOffset; this.clock = Clock.SYSTEM; + this.slotName = slotName; + this.pluginName = pluginName; } @Override @@ -275,8 +296,8 @@ public class PostgresScanFetchTask implements FetchTask { throws Exception { final PostgresSnapshotContext ctx = (PostgresSnapshotContext) snapshotContext; ctx.offset = offsetContext; - refreshSchema(databaseSchema, jdbcConnection, false); - + createSlotForBackFillReadTask(); + refreshSchema(databaseSchema, jdbcConnection, true); final PostgresOffset lowWatermark = currentOffset(jdbcConnection); LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", @@ -303,6 +324,10 @@ public class PostgresScanFetchTask implements FetchTask { snapshotSplit, highWatermark, WatermarkKind.HIGH); + // release slot timely + if (!isBackFillRequired(lowWatermark, highWatermark)) { + dropSlotForBackFillReadTask(); + } return SnapshotResult.completed(ctx.offset); } @@ -394,6 +419,46 @@ public class PostgresScanFetchTask implements FetchTask { } } + /** + * Create a slot before snapshot reading so that the slot can track the WAL log during the + * snapshot reading phase. + */ + private void createSlotForBackFillReadTask() { + try { + SlotState slotInfo = null; + try { + slotInfo = jdbcConnection.getReplicationSlotState(slotName, pluginName); + } catch (SQLException e) { + LOG.info( + "Unable to load info of replication slot, will try to create the slot"); + } + if (slotInfo == null) { + try { + replicationConnection.createReplicationSlot().orElse(null); + } catch (SQLException ex) { + String message = "Creation of replication slot failed"; + if (ex.getMessage().contains("already exists")) { + message += + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each."; + } + throw new FlinkRuntimeException(message, ex); + } + } + waitForReplicationSlotReady(30, jdbcConnection, slotName, pluginName); + } catch (Throwable t) { + throw new FlinkRuntimeException(t); + } + } + + /** Drop slot for backfill task and close replication connection. */ + private void dropSlotForBackFillReadTask() { + try { + replicationConnection.close(true); + } catch (Throwable t) { + throw new FlinkRuntimeException(t); + } + } + private Threads.Timer getTableScanLogTimer() { return Threads.timer(clock, LOG_INTERVAL); } diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java index 3df7705cf..ca4c5e657 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java @@ -23,8 +23,10 @@ import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory; import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext; import com.ververica.cdc.connectors.postgres.source.PostgresDialect; +import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig; import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset; import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetUtils; @@ -43,6 +45,7 @@ import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.data.Envelope; +import io.debezium.heartbeat.Heartbeat; import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory; @@ -64,6 +67,9 @@ import java.util.concurrent.atomic.AtomicReference; import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY; import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY; +import static io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP; +import static io.debezium.connector.postgresql.PostgresConnectorConfig.PLUGIN_NAME; +import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME; import static io.debezium.connector.postgresql.PostgresConnectorConfig.SNAPSHOT_MODE; import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection; import static io.debezium.connector.postgresql.PostgresObjectUtils.newPostgresValueConverterBuilder; @@ -158,7 +164,21 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { this.taskContext, jdbcConnection, this.snapShotter.shouldSnapshot(), - dbzConfig)); + sourceSplitBase instanceof StreamSplit + ? dbzConfig + : new PostgresConnectorConfig( + dbzConfig + .getConfig() + .edit() + .with( + SLOT_NAME.name(), + ((PostgresSourceConfig) sourceConfig) + .getSlotNameForBackfillTask()) + // drop slot for backfill stream split + .with(DROP_SLOT_ON_STOP.name(), true) + // Disable heartbeat event in snapshot split fetcher + .with(Heartbeat.HEARTBEAT_INTERVAL, 0) + .build()))); this.queue = new ChangeEventQueue.Builder() @@ -288,4 +308,14 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { public Snapshotter getSnapShotter() { return snapShotter; } + + public String getSlotName() { + return sourceConfig.getDbzProperties().getProperty(SLOT_NAME.name()); + } + + public String getPluginName() { + return PostgresConnectorConfig.LogicalDecoder.parse( + sourceConfig.getDbzProperties().getProperty(PLUGIN_NAME.name())) + .getPostgresPluginName(); + } } diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java index fb5b56ec1..3d6d1775d 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java @@ -210,27 +210,25 @@ public class PostgresStreamFetchTask implements FetchTask { ((PostgresOffset) streamSplit.getEndingOffset()).getLsn()); super.execute(context, partition, offsetContext); if (isBoundedRead()) { - LOG.debug("StreamSplit is bounded read: {}", streamSplit); final PostgresOffset currentOffset = PostgresOffset.of(offsetContext.getOffset()); - - if (currentOffset.isAtOrAfter(streamSplit.getEndingOffset())) { - LOG.info("StreamSplitReadTask finished for {}", streamSplit); - - try { - dispatcher.dispatchWatermarkEvent( - partition.getSourcePartition(), - streamSplit, - currentOffset, - WatermarkKind.END); - } catch (InterruptedException e) { - LOG.error("Send signal event error.", e); - errorHandler.setProducerThrowable( - new FlinkRuntimeException("Error processing WAL signal event", e)); - } - - ((PostgresScanFetchTask.PostgresChangeEventSourceContext) context).finished(); + try { + dispatcher.dispatchWatermarkEvent( + partition.getSourcePartition(), + streamSplit, + currentOffset, + WatermarkKind.END); + LOG.info( + "StreamSplitReadTask finished for {} at {}", + streamSplit, + currentOffset); + } catch (InterruptedException e) { + LOG.error("Send signal event error.", e); + errorHandler.setProducerThrowable( + new FlinkRuntimeException("Error processing WAL signal event", e)); } + + ((PostgresScanFetchTask.PostgresChangeEventSourceContext) context).finished(); } } diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/offset/PostgresOffset.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/offset/PostgresOffset.java index ccacaaef6..9302ea5fd 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/offset/PostgresOffset.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/offset/PostgresOffset.java @@ -37,7 +37,7 @@ public class PostgresOffset extends Offset { public static final PostgresOffset INITIAL_OFFSET = new PostgresOffset(Lsn.INVALID_LSN.asLong(), null, Instant.MIN); public static final PostgresOffset NO_STOPPING_OFFSET = - new PostgresOffset(Lsn.valueOf("FFFFFFFF/FFFFFFFF").asLong(), null, Instant.MAX); + new PostgresOffset(Lsn.NO_STOPPING_LSN.asLong(), null, Instant.MAX); // used by PostgresOffsetFactory PostgresOffset(Map offset) { diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java index 8ad9ea8c1..b375af662 100644 --- a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java +++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.util.FlinkRuntimeException; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.ReplicationConnection; +import io.debezium.connector.postgresql.spi.SlotState; import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.Clock; @@ -132,4 +133,27 @@ public class PostgresObjectUtils { throw new FlinkRuntimeException( "Failed to create replication connection for " + taskContext); } + + public static void waitForReplicationSlotReady( + int retryTimes, PostgresConnection jdbcConnection, String slotName, String pluginName) + throws SQLException { + int count = 0; + SlotState slotState = jdbcConnection.getReplicationSlotState(slotName, pluginName); + + while (slotState == null && count < retryTimes) { + LOGGER.info("Waiting until the replication slot is ready ..."); + try { + Thread.sleep(2000L); + } catch (InterruptedException e) { + // do nothing + } + count++; + slotState = jdbcConnection.getReplicationSlotState(slotName, pluginName); + } + if (slotState == null) { + throw new IllegalStateException( + String.format( + "The replication slot is not ready after %d seconds.", 2 * retryTimes)); + } + } } diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java index deb7bb754..28e856c03 100644 --- a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java +++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java @@ -15,6 +15,8 @@ import java.nio.ByteBuffer; * current Debezium release java version is 11, so we need to compile this file by java 8 compiler. * More * info. Abstraction of PostgreSQL log sequence number, adapted from {@link LogSequenceNumber}. + * + *

Line 32: add NO_STOPPING_LSN */ public class Lsn implements Comparable { @@ -24,6 +26,9 @@ public class Lsn implements Comparable { */ public static final Lsn INVALID_LSN = new Lsn(0); + /** The max lsn for the wal file. */ + public static final Lsn NO_STOPPING_LSN = Lsn.valueOf("FFFFFFFF/FFFFFFFF"); + private final long value; private Lsn(long value) { @@ -79,7 +84,7 @@ public class Lsn implements Comparable { final ByteBuffer buf = ByteBuffer.allocate(8); buf.putInt(logicalXlog); buf.putInt(segment); - buf.position(0); + ((java.nio.Buffer) buf).position(0); final long value = buf.getLong(); return Lsn.valueOf(value); @@ -103,7 +108,7 @@ public class Lsn implements Comparable { public String asString() { final ByteBuffer buf = ByteBuffer.allocate(8); buf.putLong(value); - buf.position(0); + ((java.nio.Buffer) buf).position(0); final int logicalXlog = buf.getInt(); final int segment = buf.getInt(); @@ -133,6 +138,10 @@ public class Lsn implements Comparable { return this != INVALID_LSN; } + public boolean isNonStopping() { + return this == NO_STOPPING_LSN; + } + @Override public String toString() { return "LSN{" + asString() + '}';