[postgres] Fix postgres e2e test

pull/2205/head
Hang Ruan 2 years ago
parent 960ef134bc
commit 015e9515d3

@ -40,6 +40,7 @@ import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -58,10 +59,8 @@ public class PostgresE2eITCase extends FlinkContainerTestEnvironment {
private static final String INTER_CONTAINER_PG_ALIAS = "postgres";
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
// debezium/postgres:9.6-alpine supports both ARM and AMD architectures
private static final DockerImageName PG_IMAGE =
DockerImageName.parse("debezium/postgres:9.6-alpine")
.asCompatibleSubstituteFor("postgres");
DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres");
private static final Path postgresCdcJar = TestUtils.getResource("postgres-cdc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -83,7 +82,16 @@ public class PostgresE2eITCase extends FlinkContainerTestEnvironment {
.withPassword(PG_TEST_PASSWORD)
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_PG_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
.withLogConsumer(new Slf4jLogConsumer(LOG))
.withCommand(
"postgres",
"-c",
// default
"fsync=off",
"-c",
"max_wal_senders=20",
"-c",
"max_replication_slots=20");
@Before
public void before() {
@ -97,51 +105,12 @@ public class PostgresE2eITCase extends FlinkContainerTestEnvironment {
super.after();
}
List<String> sourceSql =
Arrays.asList(
"SET 'execution.checkpointing.interval' = '3s';",
"CREATE TABLE products_source (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'postgres-cdc',",
" 'hostname' = '" + INTER_CONTAINER_PG_ALIAS + "',",
" 'port' = '" + POSTGRESQL_PORT + "',",
" 'username' = '" + PG_TEST_USER + "',",
" 'password' = '" + PG_TEST_PASSWORD + "',",
" 'database-name' = '" + POSTGRES.getDatabaseName() + "',",
" 'schema-name' = 'inventory',",
" 'table-name' = 'products',",
" 'slot.name' = 'flink',", // dropping the slot allows WAL segments to be
// discarded by the database
" 'debezium.slot.drop_on_stop' = 'true'",
");");
public static String getSlotName(String prefix) {
final Random random = new Random();
int id = random.nextInt(9000);
return prefix + id;
}
List<String> sourceSqlWithIncrementalSnapshot =
Arrays.asList(
"CREATE TABLE products_source (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'postgres-cdc',",
" 'hostname' = '" + INTER_CONTAINER_PG_ALIAS + "',",
" 'port' = '" + POSTGRESQL_PORT + "',",
" 'username' = '" + PG_TEST_USER + "',",
" 'password' = '" + PG_TEST_PASSWORD + "',",
" 'database-name' = '" + POSTGRES.getDatabaseName() + "',",
" 'schema-name' = 'inventory',",
" 'table-name' = 'products',",
" 'slot.name' = 'flink_incremental',",
" 'scan.incremental.snapshot.chunk.size' = '5',",
" 'scan.incremental.snapshot.enabled' = 'true',",
" 'scan.startup.mode' = 'initial'",
");");
List<String> sinkSql =
Arrays.asList(
"CREATE TABLE products_sink (",
@ -170,6 +139,29 @@ public class PostgresE2eITCase extends FlinkContainerTestEnvironment {
statement.execute("ANALYZE;");
}
List<String> sourceSqlWithIncrementalSnapshot =
Arrays.asList(
"CREATE TABLE products_source (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'postgres-cdc',",
" 'hostname' = '" + INTER_CONTAINER_PG_ALIAS + "',",
" 'port' = '" + POSTGRESQL_PORT + "',",
" 'username' = '" + PG_TEST_USER + "',",
" 'password' = '" + PG_TEST_PASSWORD + "',",
" 'database-name' = '" + POSTGRES.getDatabaseName() + "',",
" 'schema-name' = 'inventory',",
" 'table-name' = 'products',",
" 'slot.name' = '" + getSlotName("flink_incremental_") + "',",
" 'scan.incremental.snapshot.chunk.size' = '4',",
" 'scan.incremental.snapshot.enabled' = 'true',",
" 'scan.startup.mode' = 'initial'",
");");
List<String> sqlLines =
Stream.concat(sourceSqlWithIncrementalSnapshot.stream(), sinkSql.stream())
.collect(Collectors.toList());
@ -178,6 +170,29 @@ public class PostgresE2eITCase extends FlinkContainerTestEnvironment {
@Test
public void testPostgresCdcNonIncremental() throws Exception {
List<String> sourceSql =
Arrays.asList(
"SET 'execution.checkpointing.interval' = '3s';",
"CREATE TABLE products_source (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'postgres-cdc',",
" 'hostname' = '" + INTER_CONTAINER_PG_ALIAS + "',",
" 'port' = '" + POSTGRESQL_PORT + "',",
" 'username' = '" + PG_TEST_USER + "',",
" 'password' = '" + PG_TEST_PASSWORD + "',",
" 'database-name' = '" + POSTGRES.getDatabaseName() + "',",
" 'schema-name' = 'inventory',",
" 'table-name' = 'products',",
" 'slot.name' = '" + getSlotName("flink_") + "',",
// dropping the slot allows WAL segments to be
// discarded by the database
" 'debezium.slot.drop.on.stop' = 'true'",
");");
List<String> sqlLines =
Stream.concat(sourceSql.stream(), sinkSql.stream()).collect(Collectors.toList());

Loading…
Cancel
Save