[test][postgres] Close postgres containers after tests and fix container reuse bug (#2394)

(cherry picked from commit 90ed96392c)
pull/2694/head
Hongshun Wang 1 year ago committed by Leonard Xu
parent 45b003d5f8
commit 9205ac2f90

@ -37,6 +37,7 @@ import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -97,6 +98,13 @@ public class PostgreSQLSourceTest extends PostgresTestBase {
LOG.info("Containers are started.");
}
@AfterClass
public static void stopAll() {
LOG.info("Stopping containers...");
POSTGRES_CONTAINER_OLD.stop();
LOG.info("Containers are stopped.");
}
@Before
public void before() {
initializePostgresTable(POSTGRES_CONTAINER_OLD, "inventory");

@ -19,9 +19,11 @@ package com.ververica.cdc.connectors.postgres;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import com.ververica.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -83,6 +85,13 @@ public abstract class PostgresTestBase extends AbstractTestBase {
LOG.info("Containers are started.");
}
@AfterClass
public static void stopContainers() {
LOG.info("Stopping containers...");
POSTGRES_CONTAINER.stop();
LOG.info("Containers are stopped.");
}
protected Connection getJdbcConnection(PostgreSQLContainer container) throws SQLException {
return DriverManager.getConnection(
container.getJdbcUrl(), container.getUsername(), container.getPassword());
@ -90,10 +99,14 @@ public abstract class PostgresTestBase extends AbstractTestBase {
public static Connection getJdbcConnection(PostgreSQLContainer container, String databaseName)
throws SQLException {
String jdbcUrl =
String.format(
PostgresConnectionPoolFactory.JDBC_URL_PATTERN,
container.getHost(),
container.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
databaseName);
return DriverManager.getConnection(
container.withDatabaseName(databaseName).getJdbcUrl(),
container.getUsername(),
container.getPassword());
jdbcUrl, container.getUsername(), container.getPassword());
}
public static String getSlotName() {

@ -535,25 +535,58 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
// waiting for change events finished.
waitForSinkSize("sink", 16);
String databaseName = POSTGRES_CONTAINER.getDatabaseName();
List<String> expected =
Arrays.asList(
"+I(postgres,inventory,products,101,scooter,Small 2-wheel scooter,3.140)",
"+I(postgres,inventory,products,102,car battery,12V car battery,8.100)",
"+I(postgres,inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
"+I(postgres,inventory,products,104,hammer,12oz carpenter's hammer,0.750)",
"+I(postgres,inventory,products,105,hammer,14oz carpenter's hammer,0.875)",
"+I(postgres,inventory,products,106,hammer,16oz carpenter's hammer,1.000)",
"+I(postgres,inventory,products,107,rocks,box of assorted rocks,5.300)",
"+I(postgres,inventory,products,108,jacket,water resistent black wind breaker,0.100)",
"+I(postgres,inventory,products,109,spare tire,24 inch spare tire,22.200)",
"+I(postgres,inventory,products,110,jacket,water resistent white wind breaker,0.200)",
"+I(postgres,inventory,products,111,scooter,Big 2-wheel scooter ,5.180)",
"+U(postgres,inventory,products,106,hammer,18oz carpenter hammer,1.000)",
"+U(postgres,inventory,products,107,rocks,box of assorted rocks,5.100)",
"+U(postgres,inventory,products,110,jacket,new water resistent white wind breaker,0.500)",
"+U(postgres,inventory,products,111,scooter,Big 2-wheel scooter ,5.170)",
"-D(postgres,inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
"+I("
+ databaseName
+ ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)",
"+I("
+ databaseName
+ ",inventory,products,102,car battery,12V car battery,8.100)",
"+I("
+ databaseName
+ ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
"+I("
+ databaseName
+ ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)",
"+I("
+ databaseName
+ ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)",
"+I("
+ databaseName
+ ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)",
"+I("
+ databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.300)",
"+I("
+ databaseName
+ ",inventory,products,108,jacket,water resistent black wind breaker,0.100)",
"+I("
+ databaseName
+ ",inventory,products,109,spare tire,24 inch spare tire,22.200)",
"+I("
+ databaseName
+ ",inventory,products,110,jacket,water resistent white wind breaker,0.200)",
"+I("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)",
"+U("
+ databaseName
+ ",inventory,products,106,hammer,18oz carpenter hammer,1.000)",
"+U("
+ databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.100)",
"+U("
+ databaseName
+ ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)",
"+U("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)",
"-D("
+ databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
Collections.sort(actual);
Collections.sort(expected);

Loading…
Cancel
Save