getLivenessCheckPortNumbers() {
+ return Collections.singleton(this.getMappedPort(SQL_PORT));
+ }
+
+ @Override
+ public String getDriverClassName() {
+ return "com.mysql.cj.jdbc.Driver";
+ }
+
+ public String getJdbcUrl(String databaseName) {
+ return "jdbc:mysql://"
+ + getHost()
+ + ":"
+ + getDatabasePort()
+ + "/"
+ + databaseName
+ + "?useSSL=false";
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return getJdbcUrl("");
+ }
+
+ public int getDatabasePort() {
+ return getMappedPort(SQL_PORT);
+ }
+
+ public String getTenantName() {
+ return tenantName;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return TEST_DATABASE;
+ }
+
+ @Override
+ public String getUsername() {
+ return ROOT_USER + "@" + tenantName;
+ }
+
+ @Override
+ public String getPassword() {
+ return tenantPassword;
+ }
+
+ @Override
+ protected String getTestQueryString() {
+ return "SELECT 1";
+ }
+
+ public OceanBaseContainer withMode(String mode) {
+ this.mode = mode;
+ return this;
+ }
+
+ public OceanBaseContainer withTenantName(String tenantName) {
+ this.tenantName = tenantName;
+ return this;
+ }
+
+ public OceanBaseContainer withSysPassword(String sysPassword) {
+ this.sysPassword = sysPassword;
+ return this;
+ }
+
+ public OceanBaseContainer withTenantPassword(String tenantPassword) {
+ this.tenantPassword = tenantPassword;
+ return this;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java
new file mode 100644
index 000000000..aede8a292
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseMySQLCdcMetadata.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/** OceanBase CDC MySQL mode metadata. */
+public class OceanBaseMySQLCdcMetadata implements OceanBaseCdcMetadata {
+
+ private final OceanBaseContainer obServerContainer;
+ private final LogProxyContainer logProxyContainer;
+
+ private String rsList;
+
+ public OceanBaseMySQLCdcMetadata(
+ OceanBaseContainer obServerContainer, LogProxyContainer logProxyContainer) {
+ this.obServerContainer = obServerContainer;
+ this.logProxyContainer = logProxyContainer;
+ }
+
+ @Override
+ public String getCompatibleMode() {
+ return "mysql";
+ }
+
+ @Override
+ public String getHostname() {
+ return obServerContainer.getHost();
+ }
+
+ @Override
+ public int getPort() {
+ return obServerContainer.getDatabasePort();
+ }
+
+ @Override
+ public String getUsername() {
+ return obServerContainer.getUsername();
+ }
+
+ @Override
+ public String getPassword() {
+ return obServerContainer.getPassword();
+ }
+
+ @Override
+ public String getDriverClass() {
+ return obServerContainer.getDriverClassName();
+ }
+
+ @Override
+ public String getDatabase() {
+ return obServerContainer.getDatabaseName();
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return "jdbc:mysql://" + getHostname() + ":" + getPort() + "/?useSSL=false";
+ }
+
+ @Override
+ public String getTenantName() {
+ return obServerContainer.getTenantName();
+ }
+
+ @Override
+ public String getLogProxyHost() {
+ return logProxyContainer.getHost();
+ }
+
+ @Override
+ public int getLogProxyPort() {
+ return logProxyContainer.getPort();
+ }
+
+ @Override
+ public String getRsList() {
+ if (rsList == null) {
+ try (Connection connection =
+ DriverManager.getConnection(
+ getJdbcUrl(), getUsername(), getPassword());
+ Statement statement = connection.createStatement()) {
+ ResultSet rs = statement.executeQuery("SHOW PARAMETERS LIKE 'rootservice_list'");
+ rsList = rs.next() ? rs.getString("VALUE") : null;
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to query rs list", e);
+ }
+ if (rsList == null) {
+ throw new RuntimeException("Got empty rs list");
+ }
+ }
+ return rsList;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java
new file mode 100644
index 000000000..c68fdda03
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/OceanBaseOracleCdcMetadata.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+/** OceanBase CDC Oracle mode metadata. */
+public class OceanBaseOracleCdcMetadata implements OceanBaseCdcMetadata {
+
+ @Override
+ public String getCompatibleMode() {
+ return "oracle";
+ }
+
+ @Override
+ public String getHostname() {
+ return System.getenv("host");
+ }
+
+ @Override
+ public int getPort() {
+ return Integer.parseInt(System.getenv("port"));
+ }
+
+ @Override
+ public String getUsername() {
+ return System.getenv("username");
+ }
+
+ @Override
+ public String getPassword() {
+ return System.getenv("password");
+ }
+
+ @Override
+ public String getDatabase() {
+ return System.getenv("schema");
+ }
+
+ @Override
+ public String getDriverClass() {
+ return "com.oceanbase.jdbc.Driver";
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return "jdbc:oceanbase://" + getHostname() + ":" + getPort() + "/" + getDatabase();
+ }
+
+ @Override
+ public String getTenantName() {
+ return System.getenv("tenant");
+ }
+
+ @Override
+ public String getLogProxyHost() {
+ return System.getenv("log_proxy_host");
+ }
+
+ @Override
+ public int getLogProxyPort() {
+ return Integer.parseInt(System.getenv("log_proxy_port"));
+ }
+
+ @Override
+ public String getConfigUrl() {
+ return System.getenv("config_url");
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java
new file mode 100644
index 000000000..d189d030f
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/testutils/UniqueDatabase.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.testutils;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Create and populate a unique instance of an OceanBase database for each run of JUnit test. A user
+ * of class needs to provide a logical name for Debezium and database name. It is expected that
+ * there is an init file in src/test/resources/ddl/<database_name>.sql
. The
+ * database name is enriched with a unique suffix that guarantees complete isolation between runs
+ *
+ * <database_name>_<suffix>
+ *
+ * This class is inspired from Debezium project.
+ */
+public class UniqueDatabase {
+
+ private static final String[] CREATE_DATABASE_DDL =
+ new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"};
+ private static final String DROP_DATABASE_DDL = "DROP DATABASE IF EXISTS `$DBNAME$`;";
+ private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+
+ private final OceanBaseContainer container;
+ private final String databaseName;
+ private final String templateName;
+
+ public UniqueDatabase(OceanBaseContainer container, String databaseName) {
+ this(container, databaseName, Integer.toUnsignedString(new Random().nextInt(), 36));
+ }
+
+ private UniqueDatabase(
+ OceanBaseContainer container, String databaseName, final String identifier) {
+ this.container = container;
+ this.databaseName = databaseName + "_" + identifier;
+ this.templateName = databaseName;
+ }
+
+ public String getHost() {
+ return container.getHost();
+ }
+
+ public int getDatabasePort() {
+ return container.getDatabasePort();
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public String getUsername() {
+ return container.getUsername();
+ }
+
+ public String getPassword() {
+ return container.getPassword();
+ }
+
+ /** @return Fully qualified table name <databaseName>.<tableName>
*/
+ public String qualifiedTableName(final String tableName) {
+ return String.format("%s.%s", databaseName, tableName);
+ }
+
+ /** Creates the database and populates it with initialization SQL script. */
+ public void createAndInitialize() {
+ final String ddlFile = String.format("ddl/%s.sql", templateName);
+ final URL ddlTestFile = UniqueDatabase.class.getClassLoader().getResource(ddlFile);
+ assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
+ try {
+ try (Connection connection =
+ DriverManager.getConnection(
+ container.getJdbcUrl(), getUsername(), getPassword());
+ Statement statement = connection.createStatement()) {
+ final List statements =
+ Arrays.stream(
+ Stream.concat(
+ Arrays.stream(CREATE_DATABASE_DDL),
+ Files.readAllLines(
+ Paths.get(ddlTestFile.toURI()))
+ .stream())
+ .map(String::trim)
+ .filter(x -> !x.startsWith("--") && !x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+ COMMENT_PATTERN.matcher(x);
+ return m.matches() ? m.group(1) : x;
+ })
+ .map(this::convertSQL)
+ .collect(Collectors.joining("\n"))
+ .split(";"))
+ .map(x -> x.replace("$$", ";"))
+ .collect(Collectors.toList());
+ for (String stmt : statements) {
+ statement.execute(stmt);
+ }
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /** Drop the database if it is existing. */
+ public void dropDatabase() {
+ try {
+ try (Connection connection =
+ DriverManager.getConnection(
+ container.getJdbcUrl(), getUsername(), getPassword());
+ Statement statement = connection.createStatement()) {
+ final String dropDatabaseStatement = convertSQL(DROP_DATABASE_DDL);
+ statement.execute(dropDatabaseStatement);
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ container.getJdbcUrl(databaseName), getUsername(), getPassword());
+ }
+
+ private String convertSQL(final String sql) {
+ return sql.replace("$DBNAME$", databaseName);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql
deleted file mode 100644
index 0db9c71db..000000000
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/resources/ddl/mysql/docker_init.sql
+++ /dev/null
@@ -1,17 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements. See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- Set the root user password of test tenant
-ALTER USER root IDENTIFIED BY '123456';
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index d1c0bb7e7..72576bf17 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -102,7 +102,6 @@ public abstract class PipelineTestEnvironment extends TestLogger {
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
- .withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", flinkProperties)
@@ -111,7 +110,6 @@ public abstract class PipelineTestEnvironment extends TestLogger {
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
- .withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", flinkProperties)
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index c1e50101a..cdb0b43e8 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -91,6 +91,13 @@ limitations under the License.
test-jar
test
+
+ org.apache.flink
+ flink-connector-oceanbase-cdc
+ ${project.version}
+ test-jar
+ test
+
org.apache.flink
flink-connector-oracle-cdc
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
index e257311a5..2cd2f88eb 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
@@ -19,99 +19,68 @@ package org.apache.flink.cdc.connectors.tests;
import org.apache.flink.cdc.common.test.utils.JdbcProxy;
import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.LogProxyContainer;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseContainer;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseMySQLCdcMetadata;
+import org.apache.flink.cdc.connectors.oceanbase.testutils.UniqueDatabase;
import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.MountableFile;
-
-import java.net.URL;
-import java.nio.file.Files;
+
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import static org.junit.Assert.assertNotNull;
+import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createLogProxyContainer;
+import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC;
/** End-to-end tests for oceanbase-cdc connector uber jar. */
public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
- private static final Logger LOG = LoggerFactory.getLogger(OceanBaseE2eITCase.class);
-
- private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+ private static final String INTER_CONTAINER_OB_SERVER_ALIAS = "oceanbase";
+ private static final String INTER_CONTAINER_LOG_PROXY_ALIAS = "oblogproxy";
private static final Path obCdcJar = TestUtils.getResource("oceanbase-cdc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
- // ------------------------------------------------------------------------------------------
- // OceanBase container variables
- // ------------------------------------------------------------------------------------------
- private static final String OB_SERVER_IMAGE = "oceanbase/oceanbase-ce:4.2.0.0";
- private static final String OB_LOG_PROXY_IMAGE = "whhe/oblogproxy:1.1.3_4x";
- private static final String NETWORK_MODE = "host";
- private static final String INTER_CONTAINER_OB_HOST = "host.docker.internal";
- private static final String SYS_PASSWORD = "1234567";
- private static final String TEST_TENANT = "test";
- private static final String TEST_USER = "root@" + TEST_TENANT;
- private static final String TEST_PASSWORD = "7654321";
-
@ClassRule
- public static final GenericContainer> OB_SERVER =
- new GenericContainer<>(OB_SERVER_IMAGE)
- .withNetworkMode(NETWORK_MODE)
- .withEnv("MODE", "slim")
- .withEnv("OB_DATAFILE_SIZE", "1G")
- .withEnv("OB_LOG_DISK_SIZE", "4G")
- .withEnv("OB_ROOT_PASSWORD", SYS_PASSWORD)
- .withEnv("OB_TENANT_NAME", TEST_TENANT)
- .withCopyFileToContainer(
- MountableFile.forClasspathResource("docker/oceanbase/setup.sql"),
- "/root/boot/init.d/init.sql")
- .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
- .withStartupTimeout(Duration.ofMinutes(3))
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+ public static final OceanBaseContainer OB_SERVER =
+ createOceanBaseContainerForCDC()
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_OB_SERVER_ALIAS);
@ClassRule
- public static final GenericContainer> LOG_PROXY =
- new GenericContainer<>(OB_LOG_PROXY_IMAGE)
- .withNetworkMode(NETWORK_MODE)
- .withEnv("OB_SYS_PASSWORD", SYS_PASSWORD)
- .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
- .withStartupTimeout(Duration.ofMinutes(1))
- .withLogConsumer(new Slf4jLogConsumer(LOG));
+ public static final LogProxyContainer LOG_PROXY =
+ createLogProxyContainer()
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_LOG_PROXY_ALIAS);
+
+ private static final OceanBaseCdcMetadata METADATA =
+ new OceanBaseMySQLCdcMetadata(OB_SERVER, LOG_PROXY);
+
+ protected final UniqueDatabase obInventoryDatabase =
+ new UniqueDatabase(OB_SERVER, "oceanbase_inventory");
@Before
public void before() {
super.before();
- initializeTable("oceanbase_inventory");
+ obInventoryDatabase.createAndInitialize();
}
- private Connection getTestConnection(String databaseName) {
- try {
- Class.forName(MYSQL_DRIVER_CLASS);
- return DriverManager.getConnection(
- String.format("jdbc:mysql://127.0.0.1:2881/%s?useSSL=false", databaseName),
- TEST_USER,
- TEST_PASSWORD);
- } catch (Exception e) {
- throw new RuntimeException("Failed to get test jdbc connection", e);
- }
+ @After
+ public void after() {
+ super.after();
+
+ obInventoryDatabase.dropDatabase();
}
@Test
@@ -131,16 +100,18 @@ public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
") WITH (",
" 'connector' = 'oceanbase-cdc',",
" 'scan.startup.mode' = 'initial',",
- " 'username' = '" + TEST_USER + "',",
- " 'password' = '" + TEST_PASSWORD + "',",
- " 'tenant-name' = '" + TEST_TENANT + "',",
- " 'table-list' = 'inventory.products_source',",
- " 'hostname' = '" + INTER_CONTAINER_OB_HOST + "',",
+ " 'username' = '" + METADATA.getUsername() + "',",
+ " 'password' = '" + METADATA.getPassword() + "',",
+ " 'tenant-name' = '" + METADATA.getTenantName() + "',",
+ " 'table-list' = '"
+ + obInventoryDatabase.qualifiedTableName("products_source")
+ + "',",
+ " 'hostname' = '" + INTER_CONTAINER_OB_SERVER_ALIAS + "',",
" 'port' = '2881',",
- " 'jdbc.driver' = '" + MYSQL_DRIVER_CLASS + "',",
- " 'logproxy.host' = '" + INTER_CONTAINER_OB_HOST + "',",
+ " 'jdbc.driver' = '" + METADATA.getDriverClass() + "',",
+ " 'logproxy.host' = '" + INTER_CONTAINER_LOG_PROXY_ALIAS + "',",
" 'logproxy.port' = '2983',",
- " 'rootserver-list' = '127.0.0.1:2882:2881',",
+ " 'rootserver-list' = '" + METADATA.getRsList() + "',",
" 'working-mode' = 'memory',",
" 'jdbc.properties.useSSL' = 'false'",
");",
@@ -168,7 +139,7 @@ public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
submitSQLJob(sqlLines, obCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
- try (Connection conn = getTestConnection("inventory");
+ try (Connection conn = obInventoryDatabase.getJdbcConnection();
Statement stat = conn.createStatement()) {
stat.execute(
"UPDATE products_source SET description='18oz carpenter hammer' WHERE id=106;");
@@ -211,32 +182,4 @@ public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
new String[] {"id", "name", "description", "weight", "enum_c", "json_c"},
60000L);
}
-
- protected void initializeTable(String sqlFile) {
- final String ddlFile = String.format("ddl/%s.sql", sqlFile);
- final URL ddlTestFile = OceanBaseE2eITCase.class.getClassLoader().getResource(ddlFile);
- assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
- try (Connection connection = getTestConnection("");
- Statement statement = connection.createStatement()) {
- final List statements =
- Arrays.stream(
- Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
- .map(String::trim)
- .filter(x -> !x.startsWith("--") && !x.isEmpty())
- .map(
- x -> {
- final Matcher m =
- COMMENT_PATTERN.matcher(x);
- return m.matches() ? m.group(1) : x;
- })
- .collect(Collectors.joining("\n"))
- .split(";"))
- .collect(Collectors.toList());
- for (String stmt : statements) {
- statement.execute(stmt);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
index 91e7e5c6f..49ef039e0 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java
@@ -135,7 +135,6 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
- .withExtraHost("host.docker.internal", "host-gateway")
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", flinkProperties)
@@ -143,7 +142,6 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger {
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
- .withExtraHost("host.docker.internal", "host-gateway")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", flinkProperties)
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
index 6afe82d87..9c4ec5969 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
@@ -16,13 +16,8 @@
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
--- Create and populate our products using a single insert with many rows
-DROP DATABASE IF EXISTS inventory;
-
-CREATE DATABASE inventory;
-
-USE inventory;
+-- Create and populate our products using a single insert with many rows
CREATE TABLE products_source (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql
deleted file mode 100644
index ac15e8cb9..000000000
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/docker/oceanbase/setup.sql
+++ /dev/null
@@ -1,16 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements. See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
-ALTER USER root IDENTIFIED BY '7654321';