[oracle] Refactor oracle unit test

pull/2751/head
gongzhongqiang 1 year ago committed by Leonard Xu
parent 721c6d9608
commit f4f3a4092a

@ -209,7 +209,7 @@ Flink SQL> CREATE TABLE products (
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'database-name' = 'ORCLCDB',
'schema-name' = 'inventory',
'table-name' = 'products');
@ -436,7 +436,7 @@ CREATE TABLE products (
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'database-name' = 'ORCLCDB',
'schema-name' = 'inventory',
'table-name' = 'products',
'debezium.log.mining.strategy' = 'online_catalog',
@ -491,13 +491,12 @@ public class OracleParallelSourceExample {
public static void main(String[] args) throws Exception {
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy", "online_catalog");
debeziumProperties.setProperty("log.mining.continuous.mine", "true");
JdbcIncrementalSource<String> oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("XE")
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
@ -538,7 +537,7 @@ public class OracleSourceExample {
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.url("jdbc:oracle:thin:@{hostname}:{port}:{database}")
.port(1521)
.database("XE") // monitor XE database
.database("ORCLCDB") // monitor XE database
.schemaList("inventory") // monitor inventory schema
.tableList("inventory.products") // monitor products table
.username("flinkuser")

@ -6,7 +6,7 @@
version: '2.1'
services:
oracle:
image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0
image: goodboy008/oracle-19.3.0-ee:non-cdb
ports:
- "1521:1521"
elasticsearch:
@ -77,7 +77,7 @@ Flink SQL> CREATE TABLE products (
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'table-name' = 'products'
);
@ -95,7 +95,7 @@ Flink SQL> CREATE TABLE orders (
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'table-name' = 'orders'
);

@ -6,7 +6,7 @@
version: '2.1'
services:
oracle:
image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0
image: goodboy008/oracle-19.3.0-ee:non-cdb
ports:
- "1521:1521"
elasticsearch:
@ -75,7 +75,7 @@ Flink SQL> CREATE TABLE products (
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'table-name' = 'products'
);
@ -93,7 +93,7 @@ Flink SQL> CREATE TABLE orders (
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'database-name' = 'ORCLCDB',
'schema-name' = 'flinkuser',
'table-name' = 'orders'
);

@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import java.nio.file.Path;
import java.sql.Connection;
@ -39,21 +40,21 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_PWD;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_USER;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.ORACLE_DATABASE;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_PWD;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.TEST_USER;
/** End-to-end tests for oracle-cdc connector uber jar. */
public class OracleE2eITCase extends FlinkContainerTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(OracleE2eITCase.class);
private static final String ORACLE_SYSTEM_USER = "system";
private static final String ORACLE_SYSTEM_PASSWORD = "oracle";
private static final String ORACLE_TEST_USER = "dbzuser";
private static final String ORACLE_TEST_PASSWORD = "dbz";
private static final String ORACLE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver";
private static final String INTER_CONTAINER_ORACLE_ALIAS = "oracle";
private static final String ORACLE_IMAGE = "jark/oracle-xe-11g-r2-cdc:0.1";
private static final int ORACLE_PORT = 1521;
private static final Path oracleCdcJar = TestUtils.getResource("oracle-cdc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
public static final String ORACLE_IMAGE = "goodboy008/oracle-19.3.0-ee";
private static OracleContainer oracle;
@Before
@ -62,10 +63,15 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
LOG.info("Starting containers...");
oracle =
new OracleContainer(ORACLE_IMAGE)
new OracleContainer(DockerImageName.parse(ORACLE_IMAGE).withTag("non-cdb"))
.withUsername(CONNECTOR_USER)
.withPassword(CONNECTOR_PWD)
.withDatabaseName(ORACLE_DATABASE)
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_ORACLE_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
.withLogConsumer(new Slf4jLogConsumer(LOG))
.withReuse(true);
Startables.deepStart(Stream.of(oracle)).join();
LOG.info("Containers are started.");
}
@ -101,15 +107,14 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
" primary key (`ID`) not enforced",
") WITH (",
" 'connector' = 'oracle-cdc',",
" 'hostname' = '" + INTER_CONTAINER_ORACLE_ALIAS + "',",
" 'port' = '" + ORACLE_PORT + "',",
" 'username' = '" + ORACLE_SYSTEM_USER + "',",
" 'password' = '" + ORACLE_SYSTEM_PASSWORD + "',",
" 'database-name' = 'XE',",
" 'hostname' = '" + oracle.getNetworkAliases().get(0) + "',",
" 'port' = '" + oracle.getExposedPorts().get(0) + "',",
" 'username' = '" + CONNECTOR_USER + "',",
" 'password' = '" + CONNECTOR_PWD + "',",
" 'database-name' = 'ORCLCDB',",
" 'schema-name' = 'DEBEZIUM',",
" 'scan.incremental.snapshot.enabled' = 'true',",
" 'debezium.log.mining.strategy' = 'online_catalog',",
" 'debezium.log.mining.continuous.mine' = 'true',",
" 'table-name' = 'PRODUCTS',",
" 'scan.incremental.snapshot.chunk.size' = '4'",
");",
@ -135,7 +140,7 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
submitSQLJob(sqlLines, oracleCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
// generate binlogs
// generate redo log
Class.forName(ORACLE_DRIVER_CLASS);
// we need to set this property, otherwise Azure Pipeline will complain
// "ORA-01882: timezone region not found" error when building the Oracle JDBC connection
@ -189,7 +194,6 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
}
private Connection getOracleJdbcConnection() throws SQLException {
return DriverManager.getConnection(
oracle.getJdbcUrl(), ORACLE_TEST_USER, ORACLE_TEST_PASSWORD);
return DriverManager.getConnection(oracle.getJdbcUrl(), TEST_USER, TEST_PWD);
}
}

@ -21,6 +21,9 @@ under the License.
<groupId>com.ververica</groupId>
<version>2.4-SNAPSHOT</version>
</parent>
<properties>
<xdb.version>19.3.0.0</xdb.version>
</properties>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-connector-oracle-cdc</artifactId>
@ -88,7 +91,7 @@ under the License.
<dependency>
<groupId>com.oracle.database.xml</groupId>
<artifactId>xdb</artifactId>
<version>19.3.0.0</version>
<version>${xdb.version}</version>
</dependency>
<dependency>
@ -150,14 +153,14 @@ under the License.
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
<version>${json-path.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
<version>${commons-lang3.version}</version>
</dependency>
</dependencies>

@ -26,32 +26,24 @@ import org.apache.flink.test.util.MiniClusterWithClientResource;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.util.Properties;
import java.util.stream.Stream;
/** Example Tests for {@link JdbcIncrementalSource}. */
public class OracleChangeEventSourceExampleTest {
public class OracleChangeEventSourceExampleTest extends OracleSourceTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(OracleChangeEventSourceExampleTest.class);
private static final int DEFAULT_PARALLELISM = 4;
private static final long DEFAULT_CHECKPOINT_INTERVAL = 1000;
private static final OracleContainer oracleContainer =
OracleTestUtils.ORACLE_CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG));
@Rule
public final MiniClusterWithClientResource miniClusterResource =
@ -64,40 +56,30 @@ public class OracleChangeEventSourceExampleTest {
.withHaLeadershipControl()
.build());
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(oracleContainer)).join();
LOG.info("Containers are started.");
}
@After
public void teardown() {
oracleContainer.stop();
}
@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testConsumingAllEvents() throws Exception {
createAndInitialize("product.sql");
LOG.info(
"getOraclePort:{},getUsername:{},getPassword:{}",
oracleContainer.getOraclePort(),
oracleContainer.getUsername(),
oracleContainer.getPassword());
ORACLE_CONTAINER.getOraclePort(),
ORACLE_CONTAINER.getUsername(),
ORACLE_CONTAINER.getPassword());
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy", "online_catalog");
debeziumProperties.setProperty("log.mining.continuous.mine", "true");
JdbcIncrementalSource<String> oracleChangeEventSource =
new OracleSourceBuilder()
.hostname(oracleContainer.getHost())
.port(oracleContainer.getOraclePort())
.databaseList("XE")
.schemaList("DEBEZIUM")
new OracleSourceBuilder<String>()
.hostname(ORACLE_CONTAINER.getHost())
.port(ORACLE_CONTAINER.getOraclePort())
.databaseList(ORACLE_DATABASE)
.schemaList(ORACLE_SCHEMA)
.tableList("DEBEZIUM.PRODUCTS")
.username(oracleContainer.getUsername())
.password(oracleContainer.getPassword())
.username(ORACLE_CONTAINER.getUsername())
.password(ORACLE_CONTAINER.getPassword())
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())

@ -29,25 +29,19 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.jayway.jsonpath.JsonPath;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase;
import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
@ -64,7 +58,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertDelete;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertInsert;
@ -75,34 +68,21 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/** Tests for {@link OracleSource} which also heavily tests {@link DebeziumSourceFunction}. */
public class OracleSourceTest extends AbstractTestBase {
public class OracleSourceTest extends OracleSourceTestBase {
private static final Logger LOG = LoggerFactory.getLogger(OracleSourceTest.class);
private OracleContainer oracleContainer =
OracleTestUtils.ORACLE_CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG));
@Before
public void before() throws Exception {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(oracleContainer)).join();
LOG.info("Containers are started.");
}
@After
public void teardown() {
oracleContainer.stop();
}
@Test
public void testConsumingAllEvents() throws Exception {
createAndInitialize("product.sql");
DebeziumSourceFunction<SourceRecord> source = createOracleLogminerSource();
TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
setupSource(source);
try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
// start the source
@ -169,6 +149,9 @@ public class OracleSourceTest extends AbstractTestBase {
@Test
@Ignore("It can be open until DBZ-5245 and DBZ-4936 fix")
public void testCheckpointAndRestore() throws Exception {
createAndInitialize("product.sql");
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
{
@ -249,7 +232,7 @@ public class OracleSourceTest extends AbstractTestBase {
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2));
try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
@ -309,7 +292,7 @@ public class OracleSourceTest extends AbstractTestBase {
assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext3));
// can continue to receive new events
try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("DELETE FROM debezium.products WHERE id=1001");
}
@ -373,11 +356,13 @@ public class OracleSourceTest extends AbstractTestBase {
@Test
@Ignore("Debezium Oracle connector don't monitor unknown tables since 1.6, see DBZ-3612")
public void testRecoverFromRenameOperation() throws Exception {
createAndInitialize("product.sql");
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
{
try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
// Step-1: start the source from empty state
final DebeziumSourceFunction<SourceRecord> source = createOracleLogminerSource();
@ -450,7 +435,7 @@ public class OracleSourceTest extends AbstractTestBase {
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2));
try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (113,'Airplane','Toy airplane',1.304)"); // 113
@ -466,6 +451,8 @@ public class OracleSourceTest extends AbstractTestBase {
@Test
public void testConsumingEmptyTable() throws Exception {
createAndInitialize("product.sql");
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
int prevPos = 0;
@ -474,9 +461,7 @@ public class OracleSourceTest extends AbstractTestBase {
// Step-1: start the source from empty state
// ---------------------------------------------------------------------------
DebeziumSourceFunction<SourceRecord> source =
basicSourceBuilder(oracleContainer)
.tableList("debezium" + "." + "category")
.build();
basicSourceBuilder().tableList("debezium.category").build();
// we use blocking context to block the source to emit before last snapshot record
final BlockingSourceContext<SourceRecord> sourceContext =
new BlockingSourceContext<>(8);
@ -509,7 +494,7 @@ public class OracleSourceTest extends AbstractTestBase {
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext));
try (Connection connection = OracleTestUtils.getJdbcConnection(oracleContainer);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("INSERT INTO debezium.category VALUES (1, 'book')");
@ -568,22 +553,21 @@ public class OracleSourceTest extends AbstractTestBase {
// ------------------------------------------------------------------------------------------
private DebeziumSourceFunction<SourceRecord> createOracleLogminerSource() {
return basicSourceBuilder(oracleContainer).build();
return basicSourceBuilder().build();
}
private OracleSource.Builder<SourceRecord> basicSourceBuilder(OracleContainer oracleContainer) {
private OracleSource.Builder<SourceRecord> basicSourceBuilder() {
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("debezium.log.mining.strategy", "online_catalog");
debeziumProperties.setProperty("debezium.log.mining.continuous.mine", "true");
// ignore APEX XE system tables changes
// ignore APEX ORCLCDB system tables changes
debeziumProperties.setProperty("database.history.store.only.captured.tables.ddl", "true");
return OracleSource.<SourceRecord>builder()
.hostname(oracleContainer.getHost())
.port(oracleContainer.getOraclePort())
.database("XE")
.hostname(ORACLE_CONTAINER.getHost())
.port(ORACLE_CONTAINER.getOraclePort())
.database("ORCLCDB")
.tableList("debezium" + "." + "products") // monitor table "products"
.username(oracleContainer.getUsername())
.password(oracleContainer.getPassword())
.username(ORACLE_CONTAINER.getUsername())
.password(ORACLE_CONTAINER.getPassword())
.debeziumProperties(debeziumProperties)
.deserializer(new ForwardDeserializeSchema());
}

@ -23,7 +23,6 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@ -34,23 +33,15 @@ import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertNotNull;
/** IT tests for {@link OracleSourceBuilder.OracleIncrementalSource}. */
public class OracleSourceITCase extends OracleSourceTestBase {
@ -148,7 +139,6 @@ public class OracleSourceITCase extends OracleSourceTestBase {
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = 'false',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.log.mining.continuous.mine' = 'true',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'"
+ ")",
ORACLE_CONTAINER.getHost(),
@ -280,44 +270,8 @@ public class OracleSourceITCase extends OracleSourceTestBase {
}
}
private void createAndInitialize(String sqlFile) throws Exception {
final String ddlFile = String.format("ddl/%s", sqlFile);
final URL ddlTestFile = OracleSourceITCase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getConnection();
Statement statement = connection.createStatement()) {
try {
// DROP TABLE IF EXITS
statement.execute("DROP TABLE DEBEZIUM.CUSTOMERS");
statement.execute("DROP TABLE DEBEZIUM.CUSTOMERS_1");
} catch (Exception e) {
LOG.error("DEBEZIUM.CUSTOMERS DEBEZIUM.CUSTOMERS_1 NOT EXITS", e);
}
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
}
}
private void executeSql(String sql) throws Exception {
try (Connection connection = getConnection();
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
}
@ -372,27 +326,4 @@ public class OracleSourceITCase extends OracleSourceTestBase {
afterFailAction.run();
miniCluster.startTaskManager();
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
public Connection getConnection() throws SQLException {
return DriverManager.getConnection(
ORACLE_CONTAINER.getJdbcUrl(), ORACLE_SYSTEM_USER, ORACLE_SYSTEM_PASSWORD);
}
}

@ -21,13 +21,11 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import org.junit.After;
import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@ -35,12 +33,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
@ -48,7 +42,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
@ -56,26 +49,25 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_PWD;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.CONNECTOR_USER;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.ORACLE_CONTAINER;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.assertEqualsInAnyOrder;
import static com.ververica.cdc.connectors.oracle.utils.OracleTestUtils.CONNECTOR_PWD;
import static com.ververica.cdc.connectors.oracle.utils.OracleTestUtils.CONNECTOR_USER;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.createAndInitialize;
import static com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase.getJdbcConnection;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/** Integration tests for Oracle redo log SQL source. */
@RunWith(Parameterized.class)
public class OracleConnectorITCase extends AbstractTestBase {
public class OracleConnectorITCase extends OracleSourceTestBase {
private static final int RECORDS_COUNT = 10_000;
private static final int WORKERS_COUNT = 4;
private static final Logger LOG = LoggerFactory.getLogger(OracleConnectorITCase.class);
private OracleContainer oracleContainer =
OracleTestUtils.ORACLE_CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG));
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
@ -96,12 +88,7 @@ public class OracleConnectorITCase extends AbstractTestBase {
@Before
public void before() throws Exception {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(oracleContainer)).join();
LOG.info("Containers are started.");
TestValuesTableFactory.clearAllData();
if (parallelismSnapshot) {
env.setParallelism(4);
env.enableCheckpointing(200);
@ -110,14 +97,10 @@ public class OracleConnectorITCase extends AbstractTestBase {
}
}
@After
public void teardown() {
oracleContainer.stop();
}
@Test
public void testConsumingAllEvents()
throws SQLException, ExecutionException, InterruptedException {
public void testConsumingAllEvents() throws Exception {
createAndInitialize("product.sql");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -133,17 +116,16 @@ public class OracleConnectorITCase extends AbstractTestBase {
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.log.mining.continuous.mine' = 'true',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ " 'database-name' = 'XE',"
+ " 'database-name' = 'ORCLCDB',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
oracleContainer.getHost(),
oracleContainer.getOraclePort(),
"dbzuser",
"dbz",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
CONNECTOR_USER,
CONNECTOR_PWD,
parallelismSnapshot,
"debezium",
"products");
@ -225,8 +207,9 @@ public class OracleConnectorITCase extends AbstractTestBase {
}
@Test
public void testConsumingAllEventsByChunkKeyColumn()
throws SQLException, ExecutionException, InterruptedException {
public void testConsumingAllEventsByChunkKeyColumn() throws Exception {
createAndInitialize("product.sql");
if (!parallelismSnapshot) {
return;
}
@ -246,15 +229,14 @@ public class OracleConnectorITCase extends AbstractTestBase {
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'scan.incremental.snapshot.chunk.key-column' = 'ID',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.log.mining.continuous.mine' = 'true',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ " 'database-name' = 'XE',"
+ " 'database-name' = 'ORCLCDB',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
oracleContainer.getHost(),
oracleContainer.getOraclePort(),
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
"dbzuser",
"dbz",
parallelismSnapshot,
@ -318,6 +300,8 @@ public class OracleConnectorITCase extends AbstractTestBase {
@Test
public void testMetadataColumns() throws Throwable {
createAndInitialize("product.sql");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -337,16 +321,15 @@ public class OracleConnectorITCase extends AbstractTestBase {
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.log.mining.continuous.mine' = 'true',"
// + " 'debezium.database.history.store.only.captured.tables.ddl' =
// 'true',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ " 'database-name' = 'XE',"
+ " 'database-name' = 'ORCLCDB',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
oracleContainer.getHost(),
oracleContainer.getOraclePort(),
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
"dbzuser",
"dbz",
parallelismSnapshot,
@ -393,22 +376,22 @@ public class OracleConnectorITCase extends AbstractTestBase {
waitForSinkSize("sink", 16);
List<String> expected =
Arrays.asList(
"+I[XE, DEBEZIUM, PRODUCTS, 101, scooter, Small 2-wheel scooter, 3.140]",
"+I[XE, DEBEZIUM, PRODUCTS, 102, car battery, 12V car battery, 8.100]",
"+I[XE, DEBEZIUM, PRODUCTS, 103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
"+I[XE, DEBEZIUM, PRODUCTS, 104, hammer, 12oz carpenters hammer, 0.750]",
"+I[XE, DEBEZIUM, PRODUCTS, 105, hammer, 14oz carpenters hammer, 0.875]",
"+I[XE, DEBEZIUM, PRODUCTS, 106, hammer, 16oz carpenters hammer, 1.000]",
"+I[XE, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.300]",
"+I[XE, DEBEZIUM, PRODUCTS, 108, jacket, water resistent black wind breaker, 0.100]",
"+I[XE, DEBEZIUM, PRODUCTS, 109, spare tire, 24 inch spare tire, 22.200]",
"+I[XE, DEBEZIUM, PRODUCTS, 111, jacket, water resistent white wind breaker, 0.200]",
"+I[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.180]",
"+U[XE, DEBEZIUM, PRODUCTS, 106, hammer, 18oz carpenter hammer, 1.000]",
"+U[XE, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.100]",
"+U[XE, DEBEZIUM, PRODUCTS, 111, jacket, new water resistent white wind breaker, 0.500]",
"+U[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]",
"-D[XE, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]");
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 101, scooter, Small 2-wheel scooter, 3.140]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 102, car battery, 12V car battery, 8.100]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 104, hammer, 12oz carpenters hammer, 0.750]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 105, hammer, 14oz carpenters hammer, 0.875]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 106, hammer, 16oz carpenters hammer, 1.000]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.300]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 108, jacket, water resistent black wind breaker, 0.100]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 109, spare tire, 24 inch spare tire, 22.200]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 111, jacket, water resistent white wind breaker, 0.200]",
"+I[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.180]",
"+U[ORCLCDB, DEBEZIUM, PRODUCTS, 106, hammer, 18oz carpenter hammer, 1.000]",
"+U[ORCLCDB, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.100]",
"+U[ORCLCDB, DEBEZIUM, PRODUCTS, 111, jacket, new water resistent white wind breaker, 0.500]",
"+U[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]",
"-D[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
Collections.sort(expected);
@ -419,6 +402,8 @@ public class OracleConnectorITCase extends AbstractTestBase {
@Test
public void testStartupFromLatestOffset() throws Exception {
createAndInitialize("product.sql");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -434,15 +419,14 @@ public class OracleConnectorITCase extends AbstractTestBase {
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.log.mining.continuous.mine' = 'true',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'database-name' = 'XE',"
+ " 'database-name' = 'ORCLCDB',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s' ,"
+ " 'scan.startup.mode' = 'latest-offset'"
+ ")",
oracleContainer.getHost(),
oracleContainer.getOraclePort(),
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
"dbzuser",
"dbz",
parallelismSnapshot,
@ -489,7 +473,7 @@ public class OracleConnectorITCase extends AbstractTestBase {
@Test
public void testConsumingNumericColumns() throws Exception {
// Prepare numeric type data
try (Connection connection = OracleTestUtils.testConnection(oracleContainer);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"CREATE TABLE debezium.test_numeric_table ("
@ -534,14 +518,13 @@ public class OracleConnectorITCase extends AbstractTestBase {
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.log.mining.continuous.mine' = 'true',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'database-name' = 'XE',"
+ " 'database-name' = 'ORCLCDB',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
oracleContainer.getHost(),
oracleContainer.getOraclePort(),
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
"dbzuser",
"dbz",
parallelismSnapshot,
@ -588,91 +571,9 @@ public class OracleConnectorITCase extends AbstractTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testXmlType() throws Exception {
// Prepare xml type data
try (Connection connection = OracleTestUtils.testConnection(oracleContainer);
Statement statement = connection.createStatement()) {
statement.execute(
"CREATE TABLE debezium.xmltype_table ("
+ " ID NUMBER(4),"
+ " T15VARCHAR sys.xmltype,"
+ " PRIMARY KEY (ID))");
statement.execute(
"INSERT INTO debezium.xmltype_table "
+ "VALUES (11, sys.xmlType.createXML('<name><a id=\"1\" value=\"some values\">test xmlType</a></name>'))");
}
String sourceDDL =
String.format(
"CREATE TABLE test_xmltype_table ("
+ " ID INT,"
+ " T15VARCHAR STRING,"
+ " PRIMARY KEY (ID) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'oracle-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.log.mining.continuous.mine' = 'true',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ " 'database-name' = 'XE',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
oracleContainer.getHost(),
oracleContainer.getOraclePort(),
"dbzuser",
"dbz",
parallelismSnapshot,
"debezium",
"xmltype_table");
String sinkDDL =
"CREATE TABLE test_xmltype_sink ("
+ " id INT,"
+ " T15VARCHAR STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '1'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result =
tEnv.executeSql("INSERT INTO test_xmltype_sink SELECT * FROM test_xmltype_table");
waitForSnapshotStarted("test_xmltype_sink");
// waiting for change events finished.
waitForSinkSize("test_xmltype_sink", 1);
String lineSeparator = System.getProperty("line.separator");
String expectedResult =
String.format(
"+I[11, <name>%s"
+ " <a id=\"1\" value=\"some values\">test xmlType</a>%s"
+ "</name>]",
lineSeparator, lineSeparator);
List<String> expected = Arrays.asList(expectedResult);
List<String> actual = TestValuesTableFactory.getRawResults("test_xmltype_sink");
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
@Test
public void testAllDataTypes() throws Throwable {
OracleTestUtils.createAndInitialize(
OracleTestUtils.ORACLE_CONTAINER, "column_type_test.sql");
createAndInitialize("column_type_test.sql");
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
String sourceDDL =
@ -728,15 +629,14 @@ public class OracleConnectorITCase extends AbstractTestBase {
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.log.mining.continuous.mine' = 'true',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'scan.incremental.snapshot.chunk.size' = '2',"
+ " 'database-name' = 'XE',"
+ " 'database-name' = 'ORCLCDB',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
oracleContainer.getHost(),
oracleContainer.getOraclePort(),
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
CONNECTOR_USER,
CONNECTOR_PWD,
parallelismSnapshot,
@ -778,7 +678,7 @@ public class OracleConnectorITCase extends AbstractTestBase {
+ "-93784560000, "
+ "<name>\n"
+ " <a id=\"1\" value=\"some values\">test xmlType</a>\n"
+ "</name>]"
+ "</name>\n]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
@ -809,14 +709,13 @@ public class OracleConnectorITCase extends AbstractTestBase {
+ " 'table-name' = 'category',"
+ " 'scan.incremental.snapshot.enabled' = 'false',"
+ " 'debezium.log.mining.strategy' = 'online_catalog',"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ " 'debezium.log.mining.continuous.mine' = 'true'"
+ " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'"
+ ")",
oracleContainer.getHost(),
oracleContainer.getOraclePort(),
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
"dbzuser",
"dbz",
"XE",
"ORCLCDB",
"debezium");
String sinkDDL =
@ -940,8 +839,4 @@ public class OracleConnectorITCase extends AbstractTestBase {
}
}
}
public Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(oracleContainer.getJdbcUrl(), "dbzuser", "dbz");
}
}

@ -1,104 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.utils;
import com.ververica.cdc.connectors.oracle.source.OracleSourceITCase;
import org.testcontainers.containers.OracleContainer;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
/** Utility class for oracle tests. */
public class OracleTestUtils {
// You can build OracleContainer from official oracle docker image in following way, we use
// prebuilt image for time cost consideration
// ----------------- begin --------------------------
// new OracleContainer(new ImageFromDockerfile("oracle-xe-11g-tmp")
// .withFileFromClasspath(".", "docker")
// .withFileFromClasspath(
// "assets/activate-archivelog.sh",
// "docker/assets/activate-archivelog.sh")
// .withFileFromClasspath(
// "assets/activate-archivelog.sql",
// "docker/assets/activate-archivelog.sql")
// ----------------- end --------------------------
private static final String ORACLE_IMAGE = "jark/oracle-xe-11g-r2-cdc:0.1";
protected static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
public static final OracleContainer ORACLE_CONTAINER = new OracleContainer(ORACLE_IMAGE);
public static final String CONNECTOR_USER = "dbzuser";
public static final String CONNECTOR_PWD = "dbz";
public static final String SCHEMA_USER = "debezium";
public static final String SCHEMA_PWD = "dbz";
public static Connection getJdbcConnection(OracleContainer oracleContainer)
throws SQLException {
return DriverManager.getConnection(
oracleContainer.getJdbcUrl(), CONNECTOR_USER, CONNECTOR_PWD);
}
public static Connection testConnection(OracleContainer oracleContainer) throws SQLException {
return DriverManager.getConnection(oracleContainer.getJdbcUrl(), SCHEMA_USER, SCHEMA_PWD);
}
public static void createAndInitialize(OracleContainer oracleContainer, String sqlFile)
throws Exception {
final String ddlFile = String.format("ddl/%s", sqlFile);
final URL ddlTestFile = OracleSourceITCase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = OracleTestUtils.testConnection(oracleContainer);
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
}
}
}
Loading…
Cancel
Save