diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java index 19f63d077..f2f125a1e 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java @@ -53,6 +53,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static io.debezium.connector.postgresql.PostgresConnectorConfig.PLUGIN_NAME; +import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME; import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection; import static io.debezium.connector.postgresql.PostgresObjectUtils.newPostgresValueConverterBuilder; import static io.debezium.connector.postgresql.Utils.currentOffset; @@ -94,6 +96,10 @@ public class PostgresDialect implements JdbcDataSourceDialect { return jdbc; } + public PostgresConnection openJdbcConnection() { + return (PostgresConnection) openJdbcConnection(sourceConfig); + } + public PostgresReplicationConnection openPostgresReplicationConnection() { try { PostgresConnection jdbcConnection = @@ -215,4 +221,12 @@ public class PostgresDialect implements JdbcDataSourceDialect { streamFetchTask.commitCurrentOffset(); } } + + public String getSlotName() { + return sourceConfig.getDbzProperties().getProperty(SLOT_NAME.name()); + } + + public String getPluginName() { + return sourceConfig.getDbzProperties().getProperty(PLUGIN_NAME.name()); + } } diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java index c05831634..0c7f8a00d 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java @@ -24,7 +24,11 @@ import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnum import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.postgres.source.PostgresDialect; import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; +import io.debezium.connector.postgresql.spi.SlotState; + +import java.sql.SQLException; /** * The Postgres source enumerator that enumerates receive the split request and assign the split to @@ -56,6 +60,24 @@ public class PostgresSourceEnumerator extends IncrementalSourceEnumerator { * reading the globalStreamSplit to catch all data changes. */ private void createSlotForGlobalStreamSplit() { + SlotState slotInfo = null; + try (PostgresConnection connection = postgresDialect.openJdbcConnection()) { + slotInfo = + connection.getReplicationSlotState( + postgresDialect.getSlotName(), postgresDialect.getPluginName()); + } catch (SQLException e) { + throw new RuntimeException( + String.format( + "Fail to get the replication slot info, the slot name is %s.", + postgresDialect.getSlotName()), + e); + } + + // skip creating the replication slot when the slot exists. + if (slotInfo != null) { + return; + } + try { PostgresReplicationConnection replicationConnection = postgresDialect.openPostgresReplicationConnection(); diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index f7da893bc..715d9b43a 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -109,7 +109,7 @@ public class PostgresScanFetchTask implements FetchTask { ctx.getDispatcher(), ctx.getSnapshotChangeEventSourceMetrics(), split, - ctx.getSlotName(), + ((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask(), ctx.getPluginName()); SnapshotSplitChangeEventSourceContext changeEventSourceContext = diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java index d96a8e4c7..c90b589fa 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java @@ -22,8 +22,8 @@ import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory; import com.ververica.cdc.connectors.base.source.meta.offset.Offset; +import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; -import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext; import com.ververica.cdc.connectors.postgres.source.PostgresDialect; import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig; @@ -121,6 +121,22 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { public void configure(SourceSplitBase sourceSplitBase) { LOG.debug("Configuring PostgresSourceFetchTaskContext for split: {}", sourceSplitBase); PostgresConnectorConfig dbzConfig = getDbzConnectorConfig(); + if (sourceSplitBase instanceof SnapshotSplit) { + dbzConfig = + new PostgresConnectorConfig( + dbzConfig + .getConfig() + .edit() + .with( + SLOT_NAME.name(), + ((PostgresSourceConfig) sourceConfig) + .getSlotNameForBackfillTask()) + // drop slot for backfill stream split + .with(DROP_SLOT_ON_STOP.name(), true) + // Disable heartbeat event in snapshot split fetcher + .with(Heartbeat.HEARTBEAT_INTERVAL, 0) + .build()); + } PostgresConnectorConfig.SnapshotMode snapshotMode = PostgresConnectorConfig.SnapshotMode.parse( @@ -164,21 +180,7 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { this.taskContext, jdbcConnection, this.snapShotter.shouldSnapshot(), - sourceSplitBase instanceof StreamSplit - ? dbzConfig - : new PostgresConnectorConfig( - dbzConfig - .getConfig() - .edit() - .with( - SLOT_NAME.name(), - ((PostgresSourceConfig) sourceConfig) - .getSlotNameForBackfillTask()) - // drop slot for backfill stream split - .with(DROP_SLOT_ON_STOP.name(), true) - // Disable heartbeat event in snapshot split fetcher - .with(Heartbeat.HEARTBEAT_INTERVAL, 0) - .build()))); + dbzConfig)); this.queue = new ChangeEventQueue.Builder() diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresSQLSourceTest.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgreSQLSourceTest.java similarity index 99% rename from flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresSQLSourceTest.java rename to flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgreSQLSourceTest.java index ee04070ca..a7000951f 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresSQLSourceTest.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgreSQLSourceTest.java @@ -77,8 +77,8 @@ import static org.junit.Assert.fail; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; /** Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}. */ -public class PostgresSQLSourceTest extends PostgresTestBase { - private static final Logger LOG = LoggerFactory.getLogger(PostgresSQLSourceTest.class); +public class PostgreSQLSourceTest extends PostgresTestBase { + private static final Logger LOG = LoggerFactory.getLogger(PostgreSQLSourceTest.class); private static final String SLOT_NAME = "flink"; // These tests only passes at the docker postgres:9.6 private static final PostgreSQLContainer POSTGRES_CONTAINER_OLD = diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java index 6903285df..87bd4d8b7 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.postgres; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.util.AbstractTestBase; import io.debezium.config.Configuration; @@ -137,4 +138,40 @@ public abstract class PostgresTestBase extends AbstractTestBase { Configuration config = Configuration.from(properties); return new PostgresConnection(JdbcConfiguration.adapt(config), "test-connection"); } + + protected void waitForSnapshotStarted(String sinkName) throws InterruptedException { + while (sinkSize(sinkName) == 0) { + Thread.sleep(300); + } + } + + protected void waitForSinkResult(String sinkName, List expected) + throws InterruptedException { + List actual = TestValuesTableFactory.getResults(sinkName); + actual = actual.stream().sorted().collect(Collectors.toList()); + while (actual.size() != expected.size() || !actual.equals(expected)) { + actual = + TestValuesTableFactory.getResults(sinkName).stream() + .sorted() + .collect(Collectors.toList()); + Thread.sleep(1000); + } + } + + protected void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + protected int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } } diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 3e7180506..3903f6f75 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -674,28 +674,4 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { result.getJobClient().get().cancel().get(); } - - private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { - while (sinkSize(sinkName) == 0) { - Thread.sleep(300); - } - } - - private static void waitForSinkSize(String sinkName, int expectedSize) - throws InterruptedException { - while (sinkSize(sinkName) < expectedSize) { - Thread.sleep(100); - } - } - - private static int sinkSize(String sinkName) { - synchronized (TestValuesTableFactory.class) { - try { - return TestValuesTableFactory.getRawResults(sinkName).size(); - } catch (IllegalArgumentException e) { - // job is not started yet - return 0; - } - } - } } diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java new file mode 100644 index 000000000..f66a1338e --- /dev/null +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLSavepointITCase.java @@ -0,0 +1,230 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.postgres.table; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.util.ExceptionUtils; + +import com.ververica.cdc.connectors.postgres.PostgresTestBase; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.lang.reflect.Field; +import java.sql.Connection; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; + +/** Integration tests for PostgreSQL to start from a savepoint. */ +public class PostgreSQLSavepointITCase extends PostgresTestBase { + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + } + + @Test + public void testSavepoint() throws Exception { + testRestartFromSavepoint(); + } + + private void testRestartFromSavepoint() throws Exception { + initializePostgresTable(POSTGRES_CONTAINER, "inventory"); + + final TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + final String savepointDirectory = temporaryFolder.newFolder().toURI().toString(); + String finishedSavePointPath = null; + + StreamExecutionEnvironment env = getStreamExecutionEnvironment(finishedSavePointPath, 4); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'scan.incremental.snapshot.chunk.size' = '2'," + + " 'slot.name' = '%s'" + + ")", + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), + "inventory", + "products", + getSlotName()); + String sinkDDL = + "CREATE TABLE sink " + + " WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ") LIKE debezium_source (EXCLUDING OPTIONS)"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + JobClient jobClient = result.getJobClient().get(); + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM inventory.products WHERE id=111;"); + } + + // wait for the source startup, we don't have a better way to wait it, use sleep for now + Thread.sleep(10000L); + waitForSinkResult( + "sink", + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.140]", + "+I[102, car battery, 12V car battery, 8.100]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", + "+I[104, hammer, 12oz carpenter's hammer, 0.750]", + "+I[105, hammer, 14oz carpenter's hammer, 0.875]", + "+I[106, hammer, 16oz carpenter's hammer, 1.000]", + "+I[107, rocks, box of assorted rocks, 5.300]", + "+I[108, jacket, water resistent black wind breaker, 0.100]", + "+I[109, spare tire, 24 inch spare tire, 22.200]", + "+I[110, jacket, new water resistent white wind breaker, 0.500]")); + + finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); + jobClient.cancel().get(); + + env = getStreamExecutionEnvironment(finishedSavePointPath, 4); + tEnv = StreamTableEnvironment.create(env); + + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 112 + statement.execute( + "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=112;"); + statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=113;"); + statement.execute("DELETE FROM inventory.products WHERE id=113;"); + } + + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + jobClient = result.getJobClient().get(); + + waitForSinkSize("sink", 15); + + String[] expected = + new String[] { + "+I[101, scooter, Small 2-wheel scooter, 3.140]", + "+I[102, car battery, 12V car battery, 8.100]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", + "+I[104, hammer, 12oz carpenter's hammer, 0.750]", + "+I[105, hammer, 14oz carpenter's hammer, 0.875]", + "+I[106, hammer, 16oz carpenter's hammer, 1.000]", + "+I[107, rocks, box of assorted rocks, 5.300]", + "+I[108, jacket, water resistent black wind breaker, 0.100]", + "+I[109, spare tire, 24 inch spare tire, 22.200]", + "+I[110, jacket, new water resistent white wind breaker, 0.500]", + "+I[112, jacket, new water resistent white wind breaker, 0.500]" + }; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + jobClient.cancel().get(); + temporaryFolder.delete(); + } + + private StreamExecutionEnvironment getStreamExecutionEnvironment( + String finishedSavePointPath, int parallelism) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + if (finishedSavePointPath != null) { + // restore from savepoint + // hack for test to visit protected TestStreamEnvironment#getConfiguration() method + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + Class clazz = + classLoader.loadClass( + "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment"); + Field field = clazz.getDeclaredField("configuration"); + field.setAccessible(true); + Configuration configuration = (Configuration) field.get(env); + configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath); + } + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.noRestart()); + return env; + } + + private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory) + throws ExecutionException, InterruptedException { + int retryTimes = 0; + // retry 600 times, it takes 100 milliseconds per time, at most retry 1 minute + while (retryTimes < 600) { + try { + return jobClient.triggerSavepoint(savepointDirectory).get(); + } catch (Exception e) { + Optional exception = + ExceptionUtils.findThrowable(e, CheckpointException.class); + if (exception.isPresent() + && exception.get().getMessage().contains("Checkpoint triggering task")) { + Thread.sleep(100); + retryTimes++; + } else { + throw e; + } + } + } + return null; + } +}