[postgres] Fix the slot name conflict bug (#2251) (#2281)

pull/2285/head
Hang Ruan 2 years ago committed by GitHub
parent 10ce714551
commit d3ed1a7714
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -53,6 +53,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.PLUGIN_NAME;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SLOT_NAME;
import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection; import static io.debezium.connector.postgresql.PostgresObjectUtils.createReplicationConnection;
import static io.debezium.connector.postgresql.PostgresObjectUtils.newPostgresValueConverterBuilder; import static io.debezium.connector.postgresql.PostgresObjectUtils.newPostgresValueConverterBuilder;
import static io.debezium.connector.postgresql.Utils.currentOffset; import static io.debezium.connector.postgresql.Utils.currentOffset;
@ -94,6 +96,10 @@ public class PostgresDialect implements JdbcDataSourceDialect {
return jdbc; return jdbc;
} }
public PostgresConnection openJdbcConnection() {
return (PostgresConnection) openJdbcConnection(sourceConfig);
}
public PostgresReplicationConnection openPostgresReplicationConnection() { public PostgresReplicationConnection openPostgresReplicationConnection() {
try { try {
PostgresConnection jdbcConnection = PostgresConnection jdbcConnection =
@ -215,4 +221,12 @@ public class PostgresDialect implements JdbcDataSourceDialect {
streamFetchTask.commitCurrentOffset(); streamFetchTask.commitCurrentOffset();
} }
} }
public String getSlotName() {
return sourceConfig.getDbzProperties().getProperty(SLOT_NAME.name());
}
public String getPluginName() {
return sourceConfig.getDbzProperties().getProperty(PLUGIN_NAME.name());
}
} }

@ -24,7 +24,11 @@ import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnum
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.postgres.source.PostgresDialect; import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig; import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import java.sql.SQLException;
/** /**
* The Postgres source enumerator that enumerates receive the split request and assign the split to * The Postgres source enumerator that enumerates receive the split request and assign the split to
@ -56,6 +60,24 @@ public class PostgresSourceEnumerator extends IncrementalSourceEnumerator {
* reading the globalStreamSplit to catch all data changes. * reading the globalStreamSplit to catch all data changes.
*/ */
private void createSlotForGlobalStreamSplit() { private void createSlotForGlobalStreamSplit() {
SlotState slotInfo = null;
try (PostgresConnection connection = postgresDialect.openJdbcConnection()) {
slotInfo =
connection.getReplicationSlotState(
postgresDialect.getSlotName(), postgresDialect.getPluginName());
} catch (SQLException e) {
throw new RuntimeException(
String.format(
"Fail to get the replication slot info, the slot name is %s.",
postgresDialect.getSlotName()),
e);
}
// skip creating the replication slot when the slot exists.
if (slotInfo != null) {
return;
}
try { try {
PostgresReplicationConnection replicationConnection = PostgresReplicationConnection replicationConnection =
postgresDialect.openPostgresReplicationConnection(); postgresDialect.openPostgresReplicationConnection();

@ -109,7 +109,7 @@ public class PostgresScanFetchTask implements FetchTask<SourceSplitBase> {
ctx.getDispatcher(), ctx.getDispatcher(),
ctx.getSnapshotChangeEventSourceMetrics(), ctx.getSnapshotChangeEventSourceMetrics(),
split, split,
ctx.getSlotName(), ((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask(),
ctx.getPluginName()); ctx.getPluginName());
SnapshotSplitChangeEventSourceContext changeEventSourceContext = SnapshotSplitChangeEventSourceContext changeEventSourceContext =

@ -22,8 +22,8 @@ import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory; import com.ververica.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext; import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import com.ververica.cdc.connectors.postgres.source.PostgresDialect; import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig; import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
@ -121,6 +121,22 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
public void configure(SourceSplitBase sourceSplitBase) { public void configure(SourceSplitBase sourceSplitBase) {
LOG.debug("Configuring PostgresSourceFetchTaskContext for split: {}", sourceSplitBase); LOG.debug("Configuring PostgresSourceFetchTaskContext for split: {}", sourceSplitBase);
PostgresConnectorConfig dbzConfig = getDbzConnectorConfig(); PostgresConnectorConfig dbzConfig = getDbzConnectorConfig();
if (sourceSplitBase instanceof SnapshotSplit) {
dbzConfig =
new PostgresConnectorConfig(
dbzConfig
.getConfig()
.edit()
.with(
SLOT_NAME.name(),
((PostgresSourceConfig) sourceConfig)
.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build());
}
PostgresConnectorConfig.SnapshotMode snapshotMode = PostgresConnectorConfig.SnapshotMode snapshotMode =
PostgresConnectorConfig.SnapshotMode.parse( PostgresConnectorConfig.SnapshotMode.parse(
@ -164,21 +180,7 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
this.taskContext, this.taskContext,
jdbcConnection, jdbcConnection,
this.snapShotter.shouldSnapshot(), this.snapShotter.shouldSnapshot(),
sourceSplitBase instanceof StreamSplit dbzConfig));
? dbzConfig
: new PostgresConnectorConfig(
dbzConfig
.getConfig()
.edit()
.with(
SLOT_NAME.name(),
((PostgresSourceConfig) sourceConfig)
.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build())));
this.queue = this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>() new ChangeEventQueue.Builder<DataChangeEvent>()

@ -77,8 +77,8 @@ import static org.junit.Assert.fail;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
/** Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}. */ /** Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}. */
public class PostgresSQLSourceTest extends PostgresTestBase { public class PostgreSQLSourceTest extends PostgresTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PostgresSQLSourceTest.class); private static final Logger LOG = LoggerFactory.getLogger(PostgreSQLSourceTest.class);
private static final String SLOT_NAME = "flink"; private static final String SLOT_NAME = "flink";
// These tests only passes at the docker postgres:9.6 // These tests only passes at the docker postgres:9.6
private static final PostgreSQLContainer<?> POSTGRES_CONTAINER_OLD = private static final PostgreSQLContainer<?> POSTGRES_CONTAINER_OLD =

@ -16,6 +16,7 @@
package com.ververica.cdc.connectors.postgres; package com.ververica.cdc.connectors.postgres;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.AbstractTestBase;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
@ -137,4 +138,40 @@ public abstract class PostgresTestBase extends AbstractTestBase {
Configuration config = Configuration.from(properties); Configuration config = Configuration.from(properties);
return new PostgresConnection(JdbcConfiguration.adapt(config), "test-connection"); return new PostgresConnection(JdbcConfiguration.adapt(config), "test-connection");
} }
protected void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(300);
}
}
protected void waitForSinkResult(String sinkName, List<String> expected)
throws InterruptedException {
List<String> actual = TestValuesTableFactory.getResults(sinkName);
actual = actual.stream().sorted().collect(Collectors.toList());
while (actual.size() != expected.size() || !actual.equals(expected)) {
actual =
TestValuesTableFactory.getResults(sinkName).stream()
.sorted()
.collect(Collectors.toList());
Thread.sleep(1000);
}
}
protected void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
protected int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
} }

@ -674,28 +674,4 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(300);
}
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
} }

@ -0,0 +1,230 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.table;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.postgres.PostgresTestBase;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
/** Integration tests for PostgreSQL to start from a savepoint. */
public class PostgreSQLSavepointITCase extends PostgresTestBase {
@Before
public void before() {
TestValuesTableFactory.clearAllData();
}
@Test
public void testSavepoint() throws Exception {
testRestartFromSavepoint();
}
private void testRestartFromSavepoint() throws Exception {
initializePostgresTable(POSTGRES_CONTAINER, "inventory");
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
String finishedSavePointPath = null;
StreamExecutionEnvironment env = getStreamExecutionEnvironment(finishedSavePointPath, 4);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ " 'slot.name' = '%s'"
+ ")",
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"inventory",
"products",
getSlotName());
String sinkDDL =
"CREATE TABLE sink "
+ " WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
JobClient jobClient = result.getJobClient().get();
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM inventory.products WHERE id=111;");
}
// wait for the source startup, we don't have a better way to wait it, use sleep for now
Thread.sleep(10000L);
waitForSinkResult(
"sink",
Arrays.asList(
"+I[101, scooter, Small 2-wheel scooter, 3.140]",
"+I[102, car battery, 12V car battery, 8.100]",
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
"+I[104, hammer, 12oz carpenter's hammer, 0.750]",
"+I[105, hammer, 14oz carpenter's hammer, 0.875]",
"+I[106, hammer, 16oz carpenter's hammer, 1.000]",
"+I[107, rocks, box of assorted rocks, 5.300]",
"+I[108, jacket, water resistent black wind breaker, 0.100]",
"+I[109, spare tire, 24 inch spare tire, 22.200]",
"+I[110, jacket, new water resistent white wind breaker, 0.500]"));
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
env = getStreamExecutionEnvironment(finishedSavePointPath, 4);
tEnv = StreamTableEnvironment.create(env);
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 112
statement.execute(
"INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=112;");
statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=113;");
statement.execute("DELETE FROM inventory.products WHERE id=113;");
}
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
jobClient = result.getJobClient().get();
waitForSinkSize("sink", 15);
String[] expected =
new String[] {
"+I[101, scooter, Small 2-wheel scooter, 3.140]",
"+I[102, car battery, 12V car battery, 8.100]",
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
"+I[104, hammer, 12oz carpenter's hammer, 0.750]",
"+I[105, hammer, 14oz carpenter's hammer, 0.875]",
"+I[106, hammer, 16oz carpenter's hammer, 1.000]",
"+I[107, rocks, box of assorted rocks, 5.300]",
"+I[108, jacket, water resistent black wind breaker, 0.100]",
"+I[109, spare tire, 24 inch spare tire, 22.200]",
"+I[110, jacket, new water resistent white wind breaker, 0.500]",
"+I[112, jacket, new water resistent white wind breaker, 0.500]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
jobClient.cancel().get();
temporaryFolder.delete();
}
private StreamExecutionEnvironment getStreamExecutionEnvironment(
String finishedSavePointPath, int parallelism) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (finishedSavePointPath != null) {
// restore from savepoint
// hack for test to visit protected TestStreamEnvironment#getConfiguration() method
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> clazz =
classLoader.loadClass(
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
Field field = clazz.getDeclaredField("configuration");
field.setAccessible(true);
Configuration configuration = (Configuration) field.get(env);
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
}
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.noRestart());
return env;
}
private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory)
throws ExecutionException, InterruptedException {
int retryTimes = 0;
// retry 600 times, it takes 100 milliseconds per time, at most retry 1 minute
while (retryTimes < 600) {
try {
return jobClient.triggerSavepoint(savepointDirectory).get();
} catch (Exception e) {
Optional<CheckpointException> exception =
ExceptionUtils.findThrowable(e, CheckpointException.class);
if (exception.isPresent()
&& exception.get().getMessage().contains("Checkpoint triggering task")) {
Thread.sleep(100);
retryTimes++;
} else {
throw e;
}
}
}
return null;
}
}
Loading…
Cancel
Save