From f4f3a4092a50a6dcf9bb0704ec45087b5467aadb Mon Sep 17 00:00:00 2001 From: gongzhongqiang <764629910@qq.com> Date: Wed, 18 Oct 2023 23:03:12 +0800 Subject: [PATCH] [oracle] Refactor oracle unit test --- docs/content/connectors/oracle-cdc.md | 9 +- docs/content/quickstart/oracle-tutorial.md | 6 +- .../快速上手/oracle-tutorial-zh.md | 6 +- .../cdc/connectors/tests/OracleE2eITCase.java | 40 +-- flink-connector-oracle-cdc/pom.xml | 9 +- .../OracleChangeEventSourceExampleTest.java | 48 ++-- .../connectors/oracle/OracleSourceTest.java | 70 +++--- .../oracle/source/OracleSourceITCase.java | 71 +----- .../oracle/table/OracleConnectorITCase.java | 227 +++++------------- .../oracle/utils/OracleTestUtils.java | 104 -------- 10 files changed, 142 insertions(+), 448 deletions(-) delete mode 100644 flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/utils/OracleTestUtils.java diff --git a/docs/content/connectors/oracle-cdc.md b/docs/content/connectors/oracle-cdc.md index fc89cc130..16eb6822f 100644 --- a/docs/content/connectors/oracle-cdc.md +++ b/docs/content/connectors/oracle-cdc.md @@ -209,7 +209,7 @@ Flink SQL> CREATE TABLE products ( 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', - 'database-name' = 'XE', + 'database-name' = 'ORCLCDB', 'schema-name' = 'inventory', 'table-name' = 'products'); @@ -436,7 +436,7 @@ CREATE TABLE products ( 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', - 'database-name' = 'XE', + 'database-name' = 'ORCLCDB', 'schema-name' = 'inventory', 'table-name' = 'products', 'debezium.log.mining.strategy' = 'online_catalog', @@ -491,13 +491,12 @@ public class OracleParallelSourceExample { public static void main(String[] args) throws Exception { Properties debeziumProperties = new Properties(); debeziumProperties.setProperty("log.mining.strategy", "online_catalog"); - debeziumProperties.setProperty("log.mining.continuous.mine", "true"); JdbcIncrementalSource oracleChangeEventSource = new OracleSourceBuilder() .hostname("host") .port(1521) - .databaseList("XE") + .databaseList("ORCLCDB") .schemaList("DEBEZIUM") .tableList("DEBEZIUM.PRODUCTS") .username("username") @@ -538,7 +537,7 @@ public class OracleSourceExample { SourceFunction sourceFunction = OracleSource.builder() .url("jdbc:oracle:thin:@{hostname}:{port}:{database}") .port(1521) - .database("XE") // monitor XE database + .database("ORCLCDB") // monitor XE database .schemaList("inventory") // monitor inventory schema .tableList("inventory.products") // monitor products table .username("flinkuser") diff --git a/docs/content/quickstart/oracle-tutorial.md b/docs/content/quickstart/oracle-tutorial.md index 6750ada6b..bc93a6162 100644 --- a/docs/content/quickstart/oracle-tutorial.md +++ b/docs/content/quickstart/oracle-tutorial.md @@ -6,7 +6,7 @@ version: '2.1' services: oracle: - image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0 + image: goodboy008/oracle-19.3.0-ee:non-cdb ports: - "1521:1521" elasticsearch: @@ -77,7 +77,7 @@ Flink SQL> CREATE TABLE products ( 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', - 'database-name' = 'XE', + 'database-name' = 'ORCLCDB', 'schema-name' = 'flinkuser', 'table-name' = 'products' ); @@ -95,7 +95,7 @@ Flink SQL> CREATE TABLE orders ( 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', - 'database-name' = 'XE', + 'database-name' = 'ORCLCDB', 'schema-name' = 'flinkuser', 'table-name' = 'orders' ); diff --git a/docs/content/快速上手/oracle-tutorial-zh.md b/docs/content/快速上手/oracle-tutorial-zh.md index 0cee30184..e1ad4f7ee 100644 --- a/docs/content/快速上手/oracle-tutorial-zh.md +++ b/docs/content/快速上手/oracle-tutorial-zh.md @@ -6,7 +6,7 @@ version: '2.1' services: oracle: - image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0 + image: goodboy008/oracle-19.3.0-ee:non-cdb ports: - "1521:1521" elasticsearch: @@ -75,7 +75,7 @@ Flink SQL> CREATE TABLE products ( 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', - 'database-name' = 'XE', + 'database-name' = 'ORCLCDB', 'schema-name' = 'flinkuser', 'table-name' = 'products' ); @@ -93,7 +93,7 @@ Flink SQL> CREATE TABLE orders ( 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', - 'database-name' = 'XE', + 'database-name' = 'ORCLCDB', 'schema-name' = 'flinkuser', 'table-name' = 'orders' ); diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java index e7a53396e..54d503d46 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import org.testcontainers.containers.OracleContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; import java.nio.file.Path; import java.sql.Connection; @@ -39,21 +40,21 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Stream; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_PWD; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_USER; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.ORACLE_DATABASE; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_PWD; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_USER; + /** End-to-end tests for oracle-cdc connector uber jar. */ public class OracleE2eITCase extends FlinkContainerTestEnvironment { private static final Logger LOG = LoggerFactory.getLogger(OracleE2eITCase.class); - private static final String ORACLE_SYSTEM_USER = "system"; - private static final String ORACLE_SYSTEM_PASSWORD = "oracle"; - private static final String ORACLE_TEST_USER = "dbzuser"; - private static final String ORACLE_TEST_PASSWORD = "dbz"; private static final String ORACLE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver"; private static final String INTER_CONTAINER_ORACLE_ALIAS = "oracle"; - private static final String ORACLE_IMAGE = "jark/oracle-xe-11g-r2-cdc:0.1"; - private static final int ORACLE_PORT = 1521; - private static final Path oracleCdcJar = TestUtils.getResource("oracle-cdc-connector.jar"); private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + public static final String ORACLE_IMAGE = "goodboy008/oracle-19.3.0-ee"; private static OracleContainer oracle; @Before @@ -62,10 +63,15 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment { LOG.info("Starting containers..."); oracle = - new OracleContainer(ORACLE_IMAGE) + new OracleContainer(DockerImageName.parse(ORACLE_IMAGE).withTag("non-cdb")) + .withUsername(CONNECTOR_USER) + .withPassword(CONNECTOR_PWD) + .withDatabaseName(ORACLE_DATABASE) .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_ORACLE_ALIAS) - .withLogConsumer(new Slf4jLogConsumer(LOG)); + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .withReuse(true); + Startables.deepStart(Stream.of(oracle)).join(); LOG.info("Containers are started."); } @@ -101,15 +107,14 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment { " primary key (`ID`) not enforced", ") WITH (", " 'connector' = 'oracle-cdc',", - " 'hostname' = '" + INTER_CONTAINER_ORACLE_ALIAS + "',", - " 'port' = '" + ORACLE_PORT + "',", - " 'username' = '" + ORACLE_SYSTEM_USER + "',", - " 'password' = '" + ORACLE_SYSTEM_PASSWORD + "',", - " 'database-name' = 'XE',", + " 'hostname' = '" + oracle.getNetworkAliases().get(0) + "',", + " 'port' = '" + oracle.getExposedPorts().get(0) + "',", + " 'username' = '" + CONNECTOR_USER + "',", + " 'password' = '" + CONNECTOR_PWD + "',", + " 'database-name' = 'ORCLCDB',", " 'schema-name' = 'DEBEZIUM',", " 'scan.incremental.snapshot.enabled' = 'true',", " 'debezium.log.mining.strategy' = 'online_catalog',", - " 'debezium.log.mining.continuous.mine' = 'true',", " 'table-name' = 'PRODUCTS',", " 'scan.incremental.snapshot.chunk.size' = '4'", ");", @@ -135,7 +140,7 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment { submitSQLJob(sqlLines, oracleCdcJar, jdbcJar, mysqlDriverJar); waitUntilJobRunning(Duration.ofSeconds(30)); - // generate binlogs + // generate redo log Class.forName(ORACLE_DRIVER_CLASS); // we need to set this property, otherwise Azure Pipeline will complain // "ORA-01882: timezone region not found" error when building the Oracle JDBC connection @@ -189,7 +194,6 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment { } private Connection getOracleJdbcConnection() throws SQLException { - return DriverManager.getConnection( - oracle.getJdbcUrl(), ORACLE_TEST_USER, ORACLE_TEST_PASSWORD); + return DriverManager.getConnection(oracle.getJdbcUrl(), TEST_USER, TEST_PWD); } } diff --git a/flink-connector-oracle-cdc/pom.xml b/flink-connector-oracle-cdc/pom.xml index 77923edeb..c5064e77f 100644 --- a/flink-connector-oracle-cdc/pom.xml +++ b/flink-connector-oracle-cdc/pom.xml @@ -21,6 +21,9 @@ under the License. com.ververica 2.4-SNAPSHOT + + 19.3.0.0 + 4.0.0 flink-connector-oracle-cdc @@ -88,7 +91,7 @@ under the License. com.oracle.database.xml xdb - 19.3.0.0 + ${xdb.version} @@ -150,14 +153,14 @@ under the License. com.jayway.jsonpath json-path - 2.4.0 + ${json-path.version} test org.apache.commons commons-lang3 - 3.7 + ${commons-lang3.version} diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleChangeEventSourceExampleTest.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleChangeEventSourceExampleTest.java index f40c555e1..c75816f12 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleChangeEventSourceExampleTest.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleChangeEventSourceExampleTest.java @@ -26,32 +26,24 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder; -import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils; +import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; -import org.junit.After; -import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.OracleContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import java.util.Properties; -import java.util.stream.Stream; /** Example Tests for {@link JdbcIncrementalSource}. */ -public class OracleChangeEventSourceExampleTest { +public class OracleChangeEventSourceExampleTest extends OracleSourceTestBase { private static final Logger LOG = LoggerFactory.getLogger(OracleChangeEventSourceExampleTest.class); private static final int DEFAULT_PARALLELISM = 4; private static final long DEFAULT_CHECKPOINT_INTERVAL = 1000; - private static final OracleContainer oracleContainer = - OracleTestUtils.ORACLE_CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)); @Rule public final MiniClusterWithClientResource miniClusterResource = @@ -64,40 +56,30 @@ public class OracleChangeEventSourceExampleTest { .withHaLeadershipControl() .build()); - @BeforeClass - public static void startContainers() { - LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(oracleContainer)).join(); - LOG.info("Containers are started."); - } - - @After - public void teardown() { - oracleContainer.stop(); - } - @Test @Ignore("Test ignored because it won't stop and is used for manual test") public void testConsumingAllEvents() throws Exception { + + createAndInitialize("product.sql"); + LOG.info( "getOraclePort:{},getUsername:{},getPassword:{}", - oracleContainer.getOraclePort(), - oracleContainer.getUsername(), - oracleContainer.getPassword()); + ORACLE_CONTAINER.getOraclePort(), + ORACLE_CONTAINER.getUsername(), + ORACLE_CONTAINER.getPassword()); Properties debeziumProperties = new Properties(); debeziumProperties.setProperty("log.mining.strategy", "online_catalog"); - debeziumProperties.setProperty("log.mining.continuous.mine", "true"); JdbcIncrementalSource oracleChangeEventSource = - new OracleSourceBuilder() - .hostname(oracleContainer.getHost()) - .port(oracleContainer.getOraclePort()) - .databaseList("XE") - .schemaList("DEBEZIUM") + new OracleSourceBuilder() + .hostname(ORACLE_CONTAINER.getHost()) + .port(ORACLE_CONTAINER.getOraclePort()) + .databaseList(ORACLE_DATABASE) + .schemaList(ORACLE_SCHEMA) .tableList("DEBEZIUM.PRODUCTS") - .username(oracleContainer.getUsername()) - .password(oracleContainer.getPassword()) + .username(ORACLE_CONTAINER.getUsername()) + .password(ORACLE_CONTAINER.getPassword()) .deserializer(new JsonDebeziumDeserializationSchema()) .includeSchemaChanges(true) // output the schema changes as well .startupOptions(StartupOptions.initial()) diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleSourceTest.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleSourceTest.java index bb0757892..2ef77def2 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleSourceTest.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/OracleSourceTest.java @@ -29,25 +29,19 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; -import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import com.jayway.jsonpath.JsonPath; -import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils; +import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase; import com.ververica.cdc.connectors.utils.TestSourceContext; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; import org.apache.kafka.connect.source.SourceRecord; -import org.junit.After; -import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.OracleContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import java.nio.charset.StandardCharsets; import java.sql.Connection; @@ -64,7 +58,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; import static com.ververica.cdc.connectors.utils.AssertUtils.assertDelete; import static com.ververica.cdc.connectors.utils.AssertUtils.assertInsert; @@ -75,34 +68,21 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** Tests for {@link OracleSource} which also heavily tests {@link DebeziumSourceFunction}. */ -public class OracleSourceTest extends AbstractTestBase { +public class OracleSourceTest extends OracleSourceTestBase { private static final Logger LOG = LoggerFactory.getLogger(OracleSourceTest.class); - private OracleContainer oracleContainer = - OracleTestUtils.ORACLE_CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)); - - @Before - public void before() throws Exception { - - LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(oracleContainer)).join(); - LOG.info("Containers are started."); - } - - @After - public void teardown() { - oracleContainer.stop(); - } - @Test public void testConsumingAllEvents() throws Exception { + + createAndInitialize("product.sql"); + DebeziumSourceFunction source = createOracleLogminerSource(); TestSourceContext sourceContext = new TestSourceContext<>(); setupSource(source); - try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer); + try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { // start the source @@ -169,6 +149,9 @@ public class OracleSourceTest extends AbstractTestBase { @Test @Ignore("It can be open until DBZ-5245 and DBZ-4936 fix") public void testCheckpointAndRestore() throws Exception { + + createAndInitialize("product.sql"); + final TestingListState offsetState = new TestingListState<>(); final TestingListState historyState = new TestingListState<>(); { @@ -249,7 +232,7 @@ public class OracleSourceTest extends AbstractTestBase { // make sure there is no more events assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2)); - try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer); + try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { statement.execute( @@ -309,7 +292,7 @@ public class OracleSourceTest extends AbstractTestBase { assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext3)); // can continue to receive new events - try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer); + try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { statement.execute("DELETE FROM debezium.products WHERE id=1001"); } @@ -373,11 +356,13 @@ public class OracleSourceTest extends AbstractTestBase { @Test @Ignore("Debezium Oracle connector don't monitor unknown tables since 1.6, see DBZ-3612") public void testRecoverFromRenameOperation() throws Exception { + + createAndInitialize("product.sql"); final TestingListState offsetState = new TestingListState<>(); final TestingListState historyState = new TestingListState<>(); { - try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer); + try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { // Step-1: start the source from empty state final DebeziumSourceFunction source = createOracleLogminerSource(); @@ -450,7 +435,7 @@ public class OracleSourceTest extends AbstractTestBase { // make sure there is no more events assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2)); - try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer); + try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { statement.execute( "INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (113,'Airplane','Toy airplane',1.304)"); // 113 @@ -466,6 +451,8 @@ public class OracleSourceTest extends AbstractTestBase { @Test public void testConsumingEmptyTable() throws Exception { + + createAndInitialize("product.sql"); final TestingListState offsetState = new TestingListState<>(); final TestingListState historyState = new TestingListState<>(); int prevPos = 0; @@ -474,9 +461,7 @@ public class OracleSourceTest extends AbstractTestBase { // Step-1: start the source from empty state // --------------------------------------------------------------------------- DebeziumSourceFunction source = - basicSourceBuilder(oracleContainer) - .tableList("debezium" + "." + "category") - .build(); + basicSourceBuilder().tableList("debezium.category").build(); // we use blocking context to block the source to emit before last snapshot record final BlockingSourceContext sourceContext = new BlockingSourceContext<>(8); @@ -509,7 +494,7 @@ public class OracleSourceTest extends AbstractTestBase { // make sure there is no more events assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext)); - try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer); + try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { statement.execute("INSERT INTO debezium.category VALUES (1, 'book')"); @@ -568,22 +553,21 @@ public class OracleSourceTest extends AbstractTestBase { // ------------------------------------------------------------------------------------------ private DebeziumSourceFunction createOracleLogminerSource() { - return basicSourceBuilder(oracleContainer).build(); + return basicSourceBuilder().build(); } - private OracleSource.Builder basicSourceBuilder(OracleContainer oracleContainer) { + private OracleSource.Builder basicSourceBuilder() { Properties debeziumProperties = new Properties(); debeziumProperties.setProperty("debezium.log.mining.strategy", "online_catalog"); - debeziumProperties.setProperty("debezium.log.mining.continuous.mine", "true"); - // ignore APEX XE system tables changes + // ignore APEX ORCLCDB system tables changes debeziumProperties.setProperty("database.history.store.only.captured.tables.ddl", "true"); return OracleSource.builder() - .hostname(oracleContainer.getHost()) - .port(oracleContainer.getOraclePort()) - .database("XE") + .hostname(ORACLE_CONTAINER.getHost()) + .port(ORACLE_CONTAINER.getOraclePort()) + .database("ORCLCDB") .tableList("debezium" + "." + "products") // monitor table "products" - .username(oracleContainer.getUsername()) - .password(oracleContainer.getPassword()) + .username(ORACLE_CONTAINER.getUsername()) + .password(ORACLE_CONTAINER.getPassword()) .debeziumProperties(debeziumProperties) .deserializer(new ForwardDeserializeSchema()); } diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java index ff083009e..49ddd9753 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; @@ -34,23 +33,15 @@ import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.regex.Matcher; -import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.flink.util.Preconditions.checkState; -import static org.junit.Assert.assertNotNull; /** IT tests for {@link OracleSourceBuilder.OracleIncrementalSource}. */ public class OracleSourceITCase extends OracleSourceTestBase { @@ -148,7 +139,6 @@ public class OracleSourceITCase extends OracleSourceTestBase { + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = 'false'," + " 'debezium.log.mining.strategy' = 'online_catalog'," - + " 'debezium.log.mining.continuous.mine' = 'true'," + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'" + ")", ORACLE_CONTAINER.getHost(), @@ -280,44 +270,8 @@ public class OracleSourceITCase extends OracleSourceTestBase { } } - private void createAndInitialize(String sqlFile) throws Exception { - final String ddlFile = String.format("ddl/%s", sqlFile); - final URL ddlTestFile = OracleSourceITCase.class.getClassLoader().getResource(ddlFile); - assertNotNull("Cannot locate " + ddlFile, ddlTestFile); - try (Connection connection = getConnection(); - Statement statement = connection.createStatement()) { - - try { - // DROP TABLE IF EXITS - statement.execute("DROP TABLE DEBEZIUM.CUSTOMERS"); - statement.execute("DROP TABLE DEBEZIUM.CUSTOMERS_1"); - } catch (Exception e) { - LOG.error("DEBEZIUM.CUSTOMERS DEBEZIUM.CUSTOMERS_1 NOT EXITS", e); - } - - final List statements = - Arrays.stream( - Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() - .map(String::trim) - .filter(x -> !x.startsWith("--") && !x.isEmpty()) - .map( - x -> { - final Matcher m = - COMMENT_PATTERN.matcher(x); - return m.matches() ? m.group(1) : x; - }) - .collect(Collectors.joining("\n")) - .split(";")) - .collect(Collectors.toList()); - - for (String stmt : statements) { - statement.execute(stmt); - } - } - } - private void executeSql(String sql) throws Exception { - try (Connection connection = getConnection(); + try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { statement.execute(sql); } @@ -372,27 +326,4 @@ public class OracleSourceITCase extends OracleSourceTestBase { afterFailAction.run(); miniCluster.startTaskManager(); } - - private static void waitForSinkSize(String sinkName, int expectedSize) - throws InterruptedException { - while (sinkSize(sinkName) < expectedSize) { - Thread.sleep(100); - } - } - - private static int sinkSize(String sinkName) { - synchronized (TestValuesTableFactory.class) { - try { - return TestValuesTableFactory.getRawResults(sinkName).size(); - } catch (IllegalArgumentException e) { - // job is not started yet - return 0; - } - } - } - - public Connection getConnection() throws SQLException { - return DriverManager.getConnection( - ORACLE_CONTAINER.getJdbcUrl(), ORACLE_SYSTEM_USER, ORACLE_SYSTEM_PASSWORD); - } } diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java index 51ac1b329..86c06797a 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java @@ -21,13 +21,11 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter; -import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils; -import org.junit.After; +import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase; import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -35,12 +33,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.OracleContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.time.ZoneId; @@ -48,7 +42,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; @@ -56,26 +49,25 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_PWD; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_USER; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.ORACLE_CONTAINER; import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.assertEqualsInAnyOrder; -import static com.ververica.cdc.connectors.oracle.utils.OracleTestUtils.CONNECTOR_PWD; -import static com.ververica.cdc.connectors.oracle.utils.OracleTestUtils.CONNECTOR_USER; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.createAndInitialize; +import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.getJdbcConnection; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** Integration tests for Oracle redo log SQL source. */ @RunWith(Parameterized.class) -public class OracleConnectorITCase extends AbstractTestBase { +public class OracleConnectorITCase extends OracleSourceTestBase { private static final int RECORDS_COUNT = 10_000; private static final int WORKERS_COUNT = 4; private static final Logger LOG = LoggerFactory.getLogger(OracleConnectorITCase.class); - private OracleContainer oracleContainer = - OracleTestUtils.ORACLE_CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)); - private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private final StreamTableEnvironment tEnv = @@ -96,12 +88,7 @@ public class OracleConnectorITCase extends AbstractTestBase { @Before public void before() throws Exception { - LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(oracleContainer)).join(); - LOG.info("Containers are started."); - TestValuesTableFactory.clearAllData(); - if (parallelismSnapshot) { env.setParallelism(4); env.enableCheckpointing(200); @@ -110,14 +97,10 @@ public class OracleConnectorITCase extends AbstractTestBase { } } - @After - public void teardown() { - oracleContainer.stop(); - } - @Test - public void testConsumingAllEvents() - throws SQLException, ExecutionException, InterruptedException { + public void testConsumingAllEvents() throws Exception { + + createAndInitialize("product.sql"); String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -133,17 +116,16 @@ public class OracleConnectorITCase extends AbstractTestBase { + " 'password' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'debezium.log.mining.strategy' = 'online_catalog'," - + " 'debezium.log.mining.continuous.mine' = 'true'," + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'," + " 'scan.incremental.snapshot.chunk.size' = '2'," - + " 'database-name' = 'XE'," + + " 'database-name' = 'ORCLCDB'," + " 'schema-name' = '%s'," + " 'table-name' = '%s'" + ")", - oracleContainer.getHost(), - oracleContainer.getOraclePort(), - "dbzuser", - "dbz", + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), + CONNECTOR_USER, + CONNECTOR_PWD, parallelismSnapshot, "debezium", "products"); @@ -225,8 +207,9 @@ public class OracleConnectorITCase extends AbstractTestBase { } @Test - public void testConsumingAllEventsByChunkKeyColumn() - throws SQLException, ExecutionException, InterruptedException { + public void testConsumingAllEventsByChunkKeyColumn() throws Exception { + + createAndInitialize("product.sql"); if (!parallelismSnapshot) { return; } @@ -246,15 +229,14 @@ public class OracleConnectorITCase extends AbstractTestBase { + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'scan.incremental.snapshot.chunk.key-column' = 'ID'," + " 'debezium.log.mining.strategy' = 'online_catalog'," - + " 'debezium.log.mining.continuous.mine' = 'true'," + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'," + " 'scan.incremental.snapshot.chunk.size' = '2'," - + " 'database-name' = 'XE'," + + " 'database-name' = 'ORCLCDB'," + " 'schema-name' = '%s'," + " 'table-name' = '%s'" + ")", - oracleContainer.getHost(), - oracleContainer.getOraclePort(), + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), "dbzuser", "dbz", parallelismSnapshot, @@ -318,6 +300,8 @@ public class OracleConnectorITCase extends AbstractTestBase { @Test public void testMetadataColumns() throws Throwable { + + createAndInitialize("product.sql"); String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -337,16 +321,15 @@ public class OracleConnectorITCase extends AbstractTestBase { + " 'password' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'debezium.log.mining.strategy' = 'online_catalog'," - + " 'debezium.log.mining.continuous.mine' = 'true'," // + " 'debezium.database.history.store.only.captured.tables.ddl' = // 'true'," + " 'scan.incremental.snapshot.chunk.size' = '2'," - + " 'database-name' = 'XE'," + + " 'database-name' = 'ORCLCDB'," + " 'schema-name' = '%s'," + " 'table-name' = '%s'" + ")", - oracleContainer.getHost(), - oracleContainer.getOraclePort(), + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), "dbzuser", "dbz", parallelismSnapshot, @@ -393,22 +376,22 @@ public class OracleConnectorITCase extends AbstractTestBase { waitForSinkSize("sink", 16); List expected = Arrays.asList( - "+I[XE, DEBEZIUM, PRODUCTS, 101, scooter, Small 2-wheel scooter, 3.140]", - "+I[XE, DEBEZIUM, PRODUCTS, 102, car battery, 12V car battery, 8.100]", - "+I[XE, DEBEZIUM, PRODUCTS, 103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", - "+I[XE, DEBEZIUM, PRODUCTS, 104, hammer, 12oz carpenters hammer, 0.750]", - "+I[XE, DEBEZIUM, PRODUCTS, 105, hammer, 14oz carpenters hammer, 0.875]", - "+I[XE, DEBEZIUM, PRODUCTS, 106, hammer, 16oz carpenters hammer, 1.000]", - "+I[XE, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.300]", - "+I[XE, DEBEZIUM, PRODUCTS, 108, jacket, water resistent black wind breaker, 0.100]", - "+I[XE, DEBEZIUM, PRODUCTS, 109, spare tire, 24 inch spare tire, 22.200]", - "+I[XE, DEBEZIUM, PRODUCTS, 111, jacket, water resistent white wind breaker, 0.200]", - "+I[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.180]", - "+U[XE, DEBEZIUM, PRODUCTS, 106, hammer, 18oz carpenter hammer, 1.000]", - "+U[XE, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.100]", - "+U[XE, DEBEZIUM, PRODUCTS, 111, jacket, new water resistent white wind breaker, 0.500]", - "+U[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]", - "-D[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]"); + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 101, scooter, Small 2-wheel scooter, 3.140]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 102, car battery, 12V car battery, 8.100]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 104, hammer, 12oz carpenters hammer, 0.750]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 105, hammer, 14oz carpenters hammer, 0.875]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 106, hammer, 16oz carpenters hammer, 1.000]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.300]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 108, jacket, water resistent black wind breaker, 0.100]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 109, spare tire, 24 inch spare tire, 22.200]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 111, jacket, water resistent white wind breaker, 0.200]", + "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.180]", + "+U[ORCLCDB, DEBEZIUM, PRODUCTS, 106, hammer, 18oz carpenter hammer, 1.000]", + "+U[ORCLCDB, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.100]", + "+U[ORCLCDB, DEBEZIUM, PRODUCTS, 111, jacket, new water resistent white wind breaker, 0.500]", + "+U[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]", + "-D[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]"); List actual = TestValuesTableFactory.getRawResults("sink"); Collections.sort(expected); @@ -419,6 +402,8 @@ public class OracleConnectorITCase extends AbstractTestBase { @Test public void testStartupFromLatestOffset() throws Exception { + + createAndInitialize("product.sql"); String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -434,15 +419,14 @@ public class OracleConnectorITCase extends AbstractTestBase { + " 'password' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'debezium.log.mining.strategy' = 'online_catalog'," - + " 'debezium.log.mining.continuous.mine' = 'true'," + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'," - + " 'database-name' = 'XE'," + + " 'database-name' = 'ORCLCDB'," + " 'schema-name' = '%s'," + " 'table-name' = '%s' ," + " 'scan.startup.mode' = 'latest-offset'" + ")", - oracleContainer.getHost(), - oracleContainer.getOraclePort(), + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), "dbzuser", "dbz", parallelismSnapshot, @@ -489,7 +473,7 @@ public class OracleConnectorITCase extends AbstractTestBase { @Test public void testConsumingNumericColumns() throws Exception { // Prepare numeric type data - try (Connection connection = OracleTestUtils.testConnection(oracleContainer); + try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { statement.execute( "CREATE TABLE debezium.test_numeric_table (" @@ -534,14 +518,13 @@ public class OracleConnectorITCase extends AbstractTestBase { + " 'password' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'debezium.log.mining.strategy' = 'online_catalog'," - + " 'debezium.log.mining.continuous.mine' = 'true'," + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'," - + " 'database-name' = 'XE'," + + " 'database-name' = 'ORCLCDB'," + " 'schema-name' = '%s'," + " 'table-name' = '%s'" + ")", - oracleContainer.getHost(), - oracleContainer.getOraclePort(), + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), "dbzuser", "dbz", parallelismSnapshot, @@ -588,91 +571,9 @@ public class OracleConnectorITCase extends AbstractTestBase { result.getJobClient().get().cancel().get(); } - @Test - public void testXmlType() throws Exception { - // Prepare xml type data - try (Connection connection = OracleTestUtils.testConnection(oracleContainer); - Statement statement = connection.createStatement()) { - statement.execute( - "CREATE TABLE debezium.xmltype_table (" - + " ID NUMBER(4)," - + " T15VARCHAR sys.xmltype," - + " PRIMARY KEY (ID))"); - statement.execute( - "INSERT INTO debezium.xmltype_table " - + "VALUES (11, sys.xmlType.createXML('test xmlType'))"); - } - - String sourceDDL = - String.format( - "CREATE TABLE test_xmltype_table (" - + " ID INT," - + " T15VARCHAR STRING," - + " PRIMARY KEY (ID) NOT ENFORCED" - + ") WITH (" - + " 'connector' = 'oracle-cdc'," - + " 'hostname' = '%s'," - + " 'port' = '%s'," - + " 'username' = '%s'," - + " 'password' = '%s'," - + " 'scan.incremental.snapshot.enabled' = '%s'," - + " 'debezium.log.mining.strategy' = 'online_catalog'," - + " 'debezium.log.mining.continuous.mine' = 'true'," - + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'," - + " 'scan.incremental.snapshot.chunk.size' = '2'," - + " 'database-name' = 'XE'," - + " 'schema-name' = '%s'," - + " 'table-name' = '%s'" - + ")", - oracleContainer.getHost(), - oracleContainer.getOraclePort(), - "dbzuser", - "dbz", - parallelismSnapshot, - "debezium", - "xmltype_table"); - String sinkDDL = - "CREATE TABLE test_xmltype_sink (" - + " id INT," - + " T15VARCHAR STRING," - + " PRIMARY KEY (id) NOT ENFORCED" - + ") WITH (" - + " 'connector' = 'values'," - + " 'sink-insert-only' = 'false'," - + " 'sink-expected-messages-num' = '1'" - + ")"; - tEnv.executeSql(sourceDDL); - tEnv.executeSql(sinkDDL); - - // async submit job - TableResult result = - tEnv.executeSql("INSERT INTO test_xmltype_sink SELECT * FROM test_xmltype_table"); - - waitForSnapshotStarted("test_xmltype_sink"); - - // waiting for change events finished. - waitForSinkSize("test_xmltype_sink", 1); - - String lineSeparator = System.getProperty("line.separator"); - String expectedResult = - String.format( - "+I[11, %s" - + " test xmlType%s" - + "]", - lineSeparator, lineSeparator); - - List expected = Arrays.asList(expectedResult); - - List actual = TestValuesTableFactory.getRawResults("test_xmltype_sink"); - Collections.sort(actual); - assertEquals(expected, actual); - result.getJobClient().get().cancel().get(); - } - @Test public void testAllDataTypes() throws Throwable { - OracleTestUtils.createAndInitialize( - OracleTestUtils.ORACLE_CONTAINER, "column_type_test.sql"); + createAndInitialize("column_type_test.sql"); tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); String sourceDDL = @@ -728,15 +629,14 @@ public class OracleConnectorITCase extends AbstractTestBase { + " 'password' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'debezium.log.mining.strategy' = 'online_catalog'," - + " 'debezium.log.mining.continuous.mine' = 'true'," + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'," + " 'scan.incremental.snapshot.chunk.size' = '2'," - + " 'database-name' = 'XE'," + + " 'database-name' = 'ORCLCDB'," + " 'schema-name' = '%s'," + " 'table-name' = '%s'" + ")", - oracleContainer.getHost(), - oracleContainer.getOraclePort(), + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), CONNECTOR_USER, CONNECTOR_PWD, parallelismSnapshot, @@ -777,8 +677,8 @@ public class OracleConnectorITCase extends AbstractTestBase { + "-110451600000000, " + "-93784560000, " + "\n" - + " test xmlType\n" - + "]" + + " test xmlType\n" + + "\n]" }; List actual = TestValuesTableFactory.getResults("sink"); @@ -809,14 +709,13 @@ public class OracleConnectorITCase extends AbstractTestBase { + " 'table-name' = 'category'," + " 'scan.incremental.snapshot.enabled' = 'false'," + " 'debezium.log.mining.strategy' = 'online_catalog'," - + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'," - + " 'debezium.log.mining.continuous.mine' = 'true'" + + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'" + ")", - oracleContainer.getHost(), - oracleContainer.getOraclePort(), + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), "dbzuser", "dbz", - "XE", + "ORCLCDB", "debezium"); String sinkDDL = @@ -940,8 +839,4 @@ public class OracleConnectorITCase extends AbstractTestBase { } } } - - public Connection getJdbcConnection() throws SQLException { - return DriverManager.getConnection(oracleContainer.getJdbcUrl(), "dbzuser", "dbz"); - } } diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/utils/OracleTestUtils.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/utils/OracleTestUtils.java deleted file mode 100644 index 83641d145..000000000 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/utils/OracleTestUtils.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2023 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oracle.utils; - -import com.ververica.cdc.connectors.oracle.source.OracleSourceITCase; -import org.testcontainers.containers.OracleContainer; - -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Arrays; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import static org.junit.Assert.assertNotNull; - -/** Utility class for oracle tests. */ -public class OracleTestUtils { - - // You can build OracleContainer from official oracle docker image in following way, we use - // prebuilt image for time cost consideration - // ----------------- begin -------------------------- - // new OracleContainer(new ImageFromDockerfile("oracle-xe-11g-tmp") - // .withFileFromClasspath(".", "docker") - // .withFileFromClasspath( - // "assets/activate-archivelog.sh", - // "docker/assets/activate-archivelog.sh") - // .withFileFromClasspath( - // "assets/activate-archivelog.sql", - // "docker/assets/activate-archivelog.sql") - // ----------------- end -------------------------- - private static final String ORACLE_IMAGE = "jark/oracle-xe-11g-r2-cdc:0.1"; - - protected static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); - - public static final OracleContainer ORACLE_CONTAINER = new OracleContainer(ORACLE_IMAGE); - - public static final String CONNECTOR_USER = "dbzuser"; - - public static final String CONNECTOR_PWD = "dbz"; - - public static final String SCHEMA_USER = "debezium"; - - public static final String SCHEMA_PWD = "dbz"; - - public static Connection getJdbcConnection(OracleContainer oracleContainer) - throws SQLException { - return DriverManager.getConnection( - oracleContainer.getJdbcUrl(), CONNECTOR_USER, CONNECTOR_PWD); - } - - public static Connection testConnection(OracleContainer oracleContainer) throws SQLException { - return DriverManager.getConnection(oracleContainer.getJdbcUrl(), SCHEMA_USER, SCHEMA_PWD); - } - - public static void createAndInitialize(OracleContainer oracleContainer, String sqlFile) - throws Exception { - final String ddlFile = String.format("ddl/%s", sqlFile); - final URL ddlTestFile = OracleSourceITCase.class.getClassLoader().getResource(ddlFile); - assertNotNull("Cannot locate " + ddlFile, ddlTestFile); - try (Connection connection = OracleTestUtils.testConnection(oracleContainer); - Statement statement = connection.createStatement()) { - - final List statements = - Arrays.stream( - Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() - .map(String::trim) - .filter(x -> !x.startsWith("--") && !x.isEmpty()) - .map( - x -> { - final Matcher m = - COMMENT_PATTERN.matcher(x); - return m.matches() ? m.group(1) : x; - }) - .collect(Collectors.joining("\n")) - .split(";")) - .collect(Collectors.toList()); - - for (String stmt : statements) { - statement.execute(stmt); - } - } - } -}