From e8198a6fcb5f89a7707539247e44ae6a7b7e29c6 Mon Sep 17 00:00:00 2001
From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com>
Date: Tue, 26 Sep 2023 14:17:42 +0800
Subject: [PATCH] [postgres] Not drop replication slot for stream split (#2436)

(cherry picked from commit 928ccf10f1175a0467699f3ed61172739443792a)
---
 .../external/JdbcSourceFetchTaskContext.java  |   6 +-
 .../source/fetch/PostgresScanFetchTask.java   |  23 +---
 .../fetch/PostgresSourceFetchTaskContext.java |  16 +++
 .../postgres/source/PostgresSourceITCase.java | 113 ++++++++++++++++++
 4 files changed, 136 insertions(+), 22 deletions(-)

diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
index 6a857df52..cb4b48cb9 100644
--- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
+++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
@@ -46,7 +46,7 @@ public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
 
     protected final JdbcSourceConfig sourceConfig;
     protected final JdbcDataSourceDialect dataSourceDialect;
-    protected final CommonConnectorConfig dbzConnectorConfig;
+    protected CommonConnectorConfig dbzConnectorConfig;
     protected final SchemaNameAdjuster schemaNameAdjuster;
 
     public JdbcSourceFetchTaskContext(
@@ -156,6 +156,10 @@ public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
         return dbzConnectorConfig;
     }
 
+    public void setDbzConnectorConfig(CommonConnectorConfig dbzConnectorConfig) {
+        this.dbzConnectorConfig = dbzConnectorConfig;
+    }
+
     public SchemaNameAdjuster getSchemaNameAdjuster() {
         return SchemaNameAdjuster.create();
     }
diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
index 715d9b43a..1adb1c6e8 100644
--- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
+++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
@@ -29,7 +29,6 @@ import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
 import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset;
 import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
 import com.ververica.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
-import io.debezium.config.Configuration;
 import io.debezium.connector.postgresql.PostgresConnectorConfig;
 import io.debezium.connector.postgresql.PostgresOffsetContext;
 import io.debezium.connector.postgresql.PostgresPartition;
@@ -37,7 +36,6 @@ import io.debezium.connector.postgresql.PostgresSchema;
 import io.debezium.connector.postgresql.connection.PostgresConnection;
 import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
 import io.debezium.connector.postgresql.spi.SlotState;
-import io.debezium.heartbeat.Heartbeat;
 import io.debezium.pipeline.EventDispatcher;
 import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
 import io.debezium.pipeline.source.spi.ChangeEventSource;
@@ -60,8 +58,6 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Objects;
 
-import static io.debezium.connector.postgresql.PostgresConnectorConfig.DROP_SLOT_ON_STOP;
-import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
 import static io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady;
 import static io.debezium.connector.postgresql.Utils.currentOffset;
 import static io.debezium.connector.postgresql.Utils.refreshSchema;
@@ -164,24 +160,9 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
                 PostgresOffsetUtils.getPostgresOffsetContext(
                         loader, backfillSplit.getStartingOffset());
 
-        // we should only capture events for the current table,
-        // otherwise, we may not find corresponding schema
-        PostgresSourceConfig config = (PostgresSourceConfig) ctx.getSourceConfig();
-        Configuration dbzConf =
-                ctx.getDbzConnectorConfig()
-                        .getConfig()
-                        .edit()
-                        .with("table.include.list", split.getTableId().toString())
-                        .with(SLOT_NAME.name(), config.getSlotNameForBackfillTask())
-                        // drop slot for backfill stream split
-                        .with(DROP_SLOT_ON_STOP.name(), true)
-                        // Disable heartbeat event in snapshot split fetcher
-                        .with(Heartbeat.HEARTBEAT_INTERVAL, 0)
-                        .build();
-
         final PostgresStreamFetchTask.StreamSplitReadTask backfillReadTask =
                 new PostgresStreamFetchTask.StreamSplitReadTask(
-                        new PostgresConnectorConfig(dbzConf),
+                        ctx.getDbzConnectorConfig(),
                         ctx.getSnapShotter(),
                         ctx.getConnection(),
                         ctx.getDispatcher(),
@@ -195,7 +176,7 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
         LOG.info(
                 "Execute backfillReadTask for split {} with slot name {}",
                 split,
-                dbzConf.getString(SLOT_NAME.name()));
+                ((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask());
         backfillReadTask.execute(
                 new PostgresChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext);
     }
diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
index c90b589fa..c6a0419b1 100644
--- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
+++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
@@ -127,6 +127,11 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
                             dbzConfig
                                     .getConfig()
                                     .edit()
+                                    .with(
+                                            "table.include.list",
+                                            ((SnapshotSplit) sourceSplitBase)
+                                                    .getTableId()
+                                                    .toString())
                                     .with(
                                             SLOT_NAME.name(),
                                             ((PostgresSourceConfig) sourceConfig)
@@ -136,8 +141,19 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
                                     // Disable heartbeat event in snapshot split fetcher
                                     .with(Heartbeat.HEARTBEAT_INTERVAL, 0)
                                     .build());
+        } else {
+            dbzConfig =
+                    new PostgresConnectorConfig(
+                            dbzConfig
+                                    .getConfig()
+                                    .edit()
+                                    // never drop slot for stream split, which is also global split
+                                    .with(DROP_SLOT_ON_STOP.name(), false)
+                                    .build());
         }
 
+        LOG.info("PostgresConnectorConfig is ", dbzConfig.getConfig().asProperties().toString());
+        setDbzConnectorConfig(dbzConfig);
         PostgresConnectorConfig.SnapshotMode snapshotMode =
                 PostgresConnectorConfig.SnapshotMode.parse(
                         dbzConfig.getConfig().getString(SNAPSHOT_MODE));
diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 41ce23f1f..be1ad5fb0 100644
--- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -198,6 +198,64 @@ public class PostgresSourceITCase extends PostgresTestBase {
         }
     }
 
+    @Test
+    public void testDebeziumSlotDropOnStop() throws Exception {
+        String scanStartupMode = DEFAULT_SCAN_STARTUP_MODE;
+        customDatabase.createAndInitialize();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        env.setParallelism(2);
+        env.enableCheckpointing(200L);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        String sourceDDL =
+                format(
+                        "CREATE TABLE customers ("
+                                + " id BIGINT NOT NULL,"
+                                + " name STRING,"
+                                + " address STRING,"
+                                + " phone_number STRING,"
+                                + " primary key (id) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'postgres-cdc',"
+                                + " 'scan.incremental.snapshot.enabled' = 'true',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'schema-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.startup.mode' = '%s',"
+                                + " 'scan.incremental.snapshot.chunk.size' = '100',"
+                                + " 'slot.name' = '%s', "
+                                + " 'debezium.slot.drop.on.stop' = 'true'"
+                                + ")",
+                        customDatabase.getHost(),
+                        customDatabase.getDatabasePort(),
+                        customDatabase.getUsername(),
+                        customDatabase.getPassword(),
+                        customDatabase.getDatabaseName(),
+                        SCHEMA_NAME,
+                        "customers",
+                        scanStartupMode,
+                        getSlotName());
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult = tEnv.executeSql("select * from customers");
+
+        // first step: check the snapshot data
+        if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
+            checkSnapshotData(
+                    tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
+        }
+
+        // second step: check the stream data
+        checkStreamDataWithDDLDuringFailover(
+                tableResult, FailoverType.JM, FailoverPhase.STREAM, new String[] {"customers"});
+
+        tableResult.getJobClient().get().cancel().get();
+    }
+
     private void testPostgresParallelSource(
             FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables)
             throws Exception {
@@ -371,6 +429,61 @@ public class PostgresSourceITCase extends PostgresTestBase {
         assertTrue(!hasNextData(iterator));
     }
 
+    private void checkStreamDataWithDDLDuringFailover(
+            TableResult tableResult,
+            FailoverType failoverType,
+            FailoverPhase failoverPhase,
+            String[] captureCustomerTables)
+            throws Exception {
+        waitUntilJobRunning(tableResult);
+        CloseableIterator<Row> iterator = tableResult.collect();
+        JobID jobId = tableResult.getJobClient().get().getJobID();
+
+        for (String tableId : captureCustomerTables) {
+            makeFirstPartStreamEvents(
+                    getConnection(),
+                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId);
+        }
+
+        // wait for the stream reading
+        Thread.sleep(2000L);
+
+        if (failoverPhase == FailoverPhase.STREAM) {
+            triggerFailover(
+                    failoverType,
+                    jobId,
+                    miniClusterResource.getMiniCluster(),
+                    () -> {
+                        for (String tableId : captureCustomerTables) {
+                            try {
+                                makeSecondPartStreamEvents(
+                                        getConnection(),
+                                        customDatabase.getDatabaseName()
+                                                + '.'
+                                                + SCHEMA_NAME
+                                                + '.'
+                                                + tableId);
+                            } catch (SQLException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                        sleepMs(200);
+                    });
+            waitUntilJobRunning(tableResult);
+        }
+
+        List<String> expectedStreamData = new ArrayList<>();
+        for (int i = 0; i < captureCustomerTables.length; i++) {
+            expectedStreamData.addAll(firstPartStreamEvents);
+            expectedStreamData.addAll(secondPartStreamEvents);
+        }
+        // wait for the stream reading
+        Thread.sleep(2000L);
+
+        assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
+        assertTrue(!hasNextData(iterator));
+    }
+
     private void sleepMs(long millis) {
         try {
             Thread.sleep(millis);