[tests] Add end-to-end tests for cdc connector uber jars (#594)

pull/605/head
Jark Wu 3 years ago committed by GitHub
parent dc5a0c20df
commit 61ee775b49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -47,15 +47,3 @@ stages:
vmImage: 'ubuntu-20.04'
run_end_to_end: false
jdk: 8
#steps:
# - task: Maven@3
# inputs:
# mavenPomFile: 'pom.xml'
# mavenOptions: '-Xmx3072m -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120'
# javaHomeOption: 'JDKVersion'
# jdkVersionOption: '1.8'
# jdkArchitectureOption: 'x64'
# publishJUnitResults: true
# testResultsFiles: '**/surefire-reports/TEST-*.xml'
# goals: 'clean verify'

@ -0,0 +1,227 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.ververica</groupId>
<version>2.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-cdc-e2e-tests</artifactId>
<name>flink-cdc-e2e-tests</name>
<packaging>jar</packaging>
<properties>
<flink-1.13>1.13.3</flink-1.13>
<mysql.driver.version>8.0.27</mysql.driver.version>
</properties>
<dependencies>
<!-- Drivers -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
<version>${mysql.driver.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.3.1</version>
</dependency>
<!-- CDC connectors test utils -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-test-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- testcontainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>oracle-xe</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<phase>none</phase>
</execution>
<execution>
<id>integration-tests</id>
<phase>none</phase>
</execution>
<execution>
<id>end-to-end-tests</id>
<phase>integration-test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<includes>
<include>**/*.*</include>
</includes>
<forkCount>1</forkCount>
<systemPropertyVariables>
<moduleDir>${project.basedir}</moduleDir>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-jars</id>
<phase>process-resources</phase>
<goals>
<goal>copy</goal>
</goals>
</execution>
</executions>
<configuration>
<artifactItems>
<artifactItem>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.driver.version}</version>
<destFileName>mysql-driver.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink-1.13}</version>
<destFileName>jdbc-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${project.version}</version>
<destFileName>mysql-cdc-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${project.version}</version>
<destFileName>postgres-cdc-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<version>${project.version}</version>
<destFileName>mongodb-cdc-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${project.version}</version>
<destFileName>oracle-cdc-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import com.ververica.cdc.connectors.mongodb.MongoDBTestBase;
import com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer;
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGODB_PORT;
import static org.junit.Assert.assertNotNull;
/** End-to-end tests for mongodb-cdc connector uber jar. */
public class MongoE2eITCase extends FlinkContainerTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MongoE2eITCase.class);
private static final String MONGO_TEST_USER = "flinkuser";
private static final String MONGO_TEST_PASSWORD = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
private static final String MONGO_SUPER_USER = "superuser";
private static final String MONGO_SUPER_PASSWORD = "superpw";
private static final String INTER_CONTAINER_MONGO_ALIAS = "mongodb";
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private static final Path mongoCdcJar = TestUtils.getResource("mongodb-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ClassRule
public static final MongoDBContainer MONGODB =
new MongoDBContainer()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
private static MongoClient mongoClient;
@BeforeClass
public static void beforeClass() {
executeCommandFileInMongoDB("mongo_setup", "admin");
MongoClientSettings settings =
MongoClientSettings.builder()
.applyConnectionString(
new ConnectionString(
MONGODB.getConnectionString(
MONGO_SUPER_USER, MONGO_SUPER_PASSWORD)))
.build();
mongoClient = MongoClients.create(settings);
}
@AfterClass
public static void afterClass() {
if (mongoClient != null) {
mongoClient.close();
}
}
@Test
public void testMongoDbCDC() throws Exception {
String dbName =
executeCommandFileInMongoDB(
"mongo_inventory",
"inventory" + Integer.toUnsignedString(new Random().nextInt(), 36));
List<String> sqlLines =
Arrays.asList(
"CREATE TABLE products_source (",
" _id STRING NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (_id) not enforced",
") WITH (",
" 'connector' = 'mongodb-cdc',",
" 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000',",
" 'hosts' = '" + INTER_CONTAINER_MONGO_ALIAS + ":" + MONGODB_PORT + "',",
" 'database' = '" + dbName + "',",
" 'username' = '" + MONGO_TEST_USER + "',",
" 'password' = '" + MONGO_TEST_PASSWORD + "',",
" 'collection' = 'products',",
" 'heartbeat.interval.ms' = '1000'",
");",
"CREATE TABLE mongodb_products_sink (",
" `id` STRING NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'jdbc',",
String.format(
" 'url' = 'jdbc:mysql://%s:3306/%s',",
INTER_CONTAINER_MYSQL_ALIAS,
mysqlInventoryDatabase.getDatabaseName()),
" 'table-name' = 'mongodb_products_sink',",
" 'username' = '" + MYSQL_TEST_USER + "',",
" 'password' = '" + MYSQL_TEST_PASSWORD + "'",
");",
"INSERT INTO mongodb_products_sink",
"SELECT * FROM products_source;");
submitSQLJob(sqlLines, mongoCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
// generate binlogs
MongoCollection<Document> products =
mongoClient.getDatabase(dbName).getCollection("products");
products.updateOne(
Filters.eq("_id", new ObjectId("100000000000000000000106")),
Updates.set("description", "18oz carpenter hammer"));
products.updateOne(
Filters.eq("_id", new ObjectId("100000000000000000000107")),
Updates.set("weight", 5.1));
products.insertOne(
productDocOf(
"100000000000000000000110",
"jacket",
"water resistent white wind breaker",
0.2));
products.insertOne(
productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", 5.18));
products.updateOne(
Filters.eq("_id", new ObjectId("100000000000000000000110")),
Updates.combine(
Updates.set("description", "new water resistent white wind breaker"),
Updates.set("weight", 0.5)));
products.updateOne(
Filters.eq("_id", new ObjectId("100000000000000000000111")),
Updates.set("weight", 5.17));
products.deleteOne(Filters.eq("_id", new ObjectId("100000000000000000000111")));
// assert final results
String mysqlUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
JdbcProxy proxy =
new JdbcProxy(mysqlUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS);
List<String> expectResult =
Arrays.asList(
"100000000000000000000101,scooter,Small 2-wheel scooter,3.14",
"100000000000000000000102,car battery,12V car battery,8.1",
"100000000000000000000103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8",
"100000000000000000000104,hammer,12oz carpenter's hammer,0.75",
"100000000000000000000105,hammer,14oz carpenter's hammer,0.875",
"100000000000000000000106,hammer,18oz carpenter hammer,1.0",
"100000000000000000000107,rocks,box of assorted rocks,5.1",
"100000000000000000000108,jacket,water resistent black wind breaker,0.1",
"100000000000000000000109,spare tire,24 inch spare tire,22.2",
"100000000000000000000110,jacket,new water resistent white wind breaker,0.5");
proxy.checkResultWithTimeout(
expectResult,
"mongodb_products_sink",
new String[] {"id", "name", "description", "weight"},
60000L);
}
/** Executes a mongo command file, specify a database name. */
private static String executeCommandFileInMongoDB(
String fileNameIgnoreSuffix, String databaseName) {
final String dbName = databaseName != null ? databaseName : fileNameIgnoreSuffix;
final String ddlFile = String.format("ddl/%s.js", fileNameIgnoreSuffix);
final URL ddlTestFile = MongoDBTestBase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try {
// use database;
String command0 = String.format("db = db.getSiblingDB('%s');\n", dbName);
String command1 =
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("//") && !x.isEmpty())
.map(
x -> {
final Matcher m = COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"));
MONGODB.executeCommand(command0 + command1);
return dbName;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Document productDocOf(String id, String name, String description, Double weight) {
Document document = new Document();
if (id != null) {
document.put("_id", new ObjectId(id));
}
document.put("name", name);
document.put("description", description);
document.put("weight", weight);
return document;
}
}

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests;
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
/** End-to-end tests for mysql-cdc connector uber jar. */
public class MySqlE2eITCase extends FlinkContainerTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MySqlE2eITCase.class);
private static final Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
@Test
public void testMySqlCDC() throws Exception {
List<String> sqlLines =
Arrays.asList(
"CREATE TABLE products_source (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" enum_c STRING,",
" json_c STRING,",
" point_c STRING,",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'mysql-cdc',",
" 'hostname' = '" + INTER_CONTAINER_MYSQL_ALIAS + "',",
" 'port' = '3306',",
" 'username' = '" + MYSQL_TEST_USER + "',",
" 'password' = '" + MYSQL_TEST_PASSWORD + "',",
" 'database-name' = '" + mysqlInventoryDatabase.getDatabaseName() + "',",
" 'table-name' = 'products_source',",
" 'server-id' = '5800-5900',",
" 'scan.incremental.snapshot.chunk.size' = '4'",
");",
"CREATE TABLE products_sink (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" enum_c STRING,",
" json_c STRING,",
" point_c STRING,",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'jdbc',",
String.format(
" 'url' = 'jdbc:mysql://%s:3306/%s',",
INTER_CONTAINER_MYSQL_ALIAS,
mysqlInventoryDatabase.getDatabaseName()),
" 'table-name' = 'products_sink',",
" 'username' = '" + MYSQL_TEST_USER + "',",
" 'password' = '" + MYSQL_TEST_PASSWORD + "'",
");",
"INSERT INTO products_sink",
"SELECT * FROM products_source;");
submitSQLJob(sqlLines, mysqlCdcJar, jdbcJar);
waitUntilJobRunning(Duration.ofSeconds(30));
// generate binlogs
String jdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(jdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute(
"UPDATE products_source SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products_source SET weight='5.1' WHERE id=107;");
stat.execute(
"INSERT INTO products_source VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110
stat.execute(
"INSERT INTO products_source VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null);");
stat.execute(
"UPDATE products_source SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
stat.execute("UPDATE products_source SET weight='5.17' WHERE id=111;");
stat.execute("DELETE FROM products_source WHERE id=111;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
// assert final results
JdbcProxy proxy =
new JdbcProxy(jdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS);
List<String> expectResult =
Arrays.asList(
"101,scooter,Small 2-wheel scooter,3.14,red,{\"key1\": \"value1\"},{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
"102,car battery,12V car battery,8.1,white,{\"key2\": \"value2\"},{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8,red,{\"key3\": \"value3\"},{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
"104,hammer,12oz carpenter's hammer,0.75,white,{\"key4\": \"value4\"},{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
"105,hammer,14oz carpenter's hammer,0.875,red,{\"k1\": \"v1\", \"k2\": \"v2\"},{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
"106,hammer,18oz carpenter hammer,1.0,null,null,null",
"107,rocks,box of assorted rocks,5.1,null,null,null",
"108,jacket,water resistent black wind breaker,0.1,null,null,null",
"109,spare tire,24 inch spare tire,22.2,null,null,null",
"110,jacket,new water resistent white wind breaker,0.5,null,null,null");
proxy.checkResultWithTimeout(
expectResult,
"products_sink",
new String[] {"id", "name", "description", "weight", "enum_c", "json_c", "point_c"},
60000L);
}
}

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests;
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
/** End-to-end tests for oracle-cdc connector uber jar. */
public class OracleE2eITCase extends FlinkContainerTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(OracleE2eITCase.class);
private static final String ORACLE_TEST_USER = "dbzuser";
private static final String ORACLE_TEST_PASSWORD = "dbz";
private static final String ORACLE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver";
private static final String INTER_CONTAINER_ORACLE_ALIAS = "oracle";
private static final String ORACLE_IMAGE = "jark/oracle-xe-11g-r2-cdc:0.1";
private static final int ORACLE_PORT = 1521;
private static final Path oracleCdcJar = TestUtils.getResource("oracle-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ClassRule
public static final OracleContainer ORACLE =
new OracleContainer(ORACLE_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_ORACLE_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Test
public void testOracleCDC() throws Exception {
List<String> sqlLines =
Arrays.asList(
"CREATE TABLE products_source (",
" ID INT NOT NULL,",
" NAME STRING,",
" DESCRIPTION STRING,",
" WEIGHT DECIMAL(10,3),",
" primary key (`ID`) not enforced",
") WITH (",
" 'connector' = 'oracle-cdc',",
" 'hostname' = '" + INTER_CONTAINER_ORACLE_ALIAS + "',",
" 'port' = '" + ORACLE_PORT + "',",
" 'username' = '" + ORACLE_TEST_USER + "',",
" 'password' = '" + ORACLE_TEST_PASSWORD + "',",
" 'database-name' = 'XE',",
" 'schema-name' = 'debezium',",
" 'table-name' = 'products'",
");",
"CREATE TABLE products_sink (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'jdbc',",
String.format(
" 'url' = 'jdbc:mysql://%s:3306/%s',",
INTER_CONTAINER_MYSQL_ALIAS,
mysqlInventoryDatabase.getDatabaseName()),
" 'table-name' = 'products_sink',",
" 'username' = '" + MYSQL_TEST_USER + "',",
" 'password' = '" + MYSQL_TEST_PASSWORD + "'",
");",
"INSERT INTO products_sink",
"SELECT * FROM products_source;");
submitSQLJob(sqlLines, oracleCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
// generate binlogs
Class.forName(ORACLE_DRIVER_CLASS);
// we need to set this property, otherwise Azure Pipeline will complain
// "ORA-01882: timezone region not found" error when building the Oracle JDBC connection
// see https://stackoverflow.com/a/9177263/4915129
System.setProperty("oracle.jdbc.timezoneAsRegion", "false");
try (Connection conn = getOracleJdbcConnection();
Statement statement = conn.createStatement()) {
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
statement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
statement.execute(
"INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)");
statement.execute(
"INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
statement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
statement.execute("DELETE FROM debezium.products WHERE ID=112");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
// assert final results
String mysqlUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
JdbcProxy proxy =
new JdbcProxy(mysqlUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS);
List<String> expectResult =
Arrays.asList(
"101,scooter,Small 2-wheel scooter,3.14",
"102,car battery,12V car battery,8.1",
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8",
"104,hammer,12oz carpenters hammer,0.75",
"105,hammer,14oz carpenters hammer,0.875",
"106,hammer,18oz carpenter hammer,1.0",
"107,rocks,box of assorted rocks,5.1",
"108,jacket,water resistent black wind breaker,0.1",
"109,spare tire,24 inch spare tire,22.2",
"111,jacket,new water resistent white wind breaker,0.5");
proxy.checkResultWithTimeout(
expectResult,
"products_sink",
new String[] {"id", "name", "description", "weight"},
60000L);
}
private Connection getOracleJdbcConnection() throws SQLException {
return DriverManager.getConnection(
ORACLE.getJdbcUrl(), ORACLE_TEST_USER, ORACLE_TEST_PASSWORD);
}
}

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests;
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
/** End-to-end tests for postgres-cdc connector uber jar. */
public class PostgresE2eITCase extends FlinkContainerTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(PostgresE2eITCase.class);
private static final String PG_TEST_USER = "postgres";
private static final String PG_TEST_PASSWORD = "postgres";
protected static final String PG_DRIVER_CLASS = "org.postgresql.Driver";
private static final String INTER_CONTAINER_PG_ALIAS = "postgres";
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private static final DockerImageName PG_IMAGE =
DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres");
private static final Path postgresCdcJar = TestUtils.getResource("postgres-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ClassRule
public static final PostgreSQLContainer<?> POSTGRES =
new PostgreSQLContainer<>(PG_IMAGE)
.withDatabaseName("postgres")
.withUsername(PG_TEST_USER)
.withPassword(PG_TEST_PASSWORD)
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_PG_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Before
public void before() {
super.before();
initializePostgresTable("postgres_inventory");
}
@Test
public void testPostgresCDC() throws Exception {
List<String> sqlLines =
Arrays.asList(
"CREATE TABLE products_source (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'postgres-cdc',",
" 'hostname' = '" + INTER_CONTAINER_PG_ALIAS + "',",
" 'port' = '" + POSTGRESQL_PORT + "',",
" 'username' = '" + PG_TEST_USER + "',",
" 'password' = '" + PG_TEST_PASSWORD + "',",
" 'database-name' = '" + POSTGRES.getDatabaseName() + "',",
" 'schema-name' = 'inventory',",
" 'table-name' = 'products',",
// dropping the slot allows WAL segments to be discarded by the database
" 'debezium.slot.drop_on_stop' = 'true'",
");",
"CREATE TABLE products_sink (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'jdbc',",
String.format(
" 'url' = 'jdbc:mysql://%s:3306/%s',",
INTER_CONTAINER_MYSQL_ALIAS,
mysqlInventoryDatabase.getDatabaseName()),
" 'table-name' = 'products_sink',",
" 'username' = '" + MYSQL_TEST_USER + "',",
" 'password' = '" + MYSQL_TEST_PASSWORD + "'",
");",
"INSERT INTO products_sink",
"SELECT * FROM products_source;");
submitSQLJob(sqlLines, postgresCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
// generate binlogs
try (Connection conn = getPgJdbcConnection();
Statement statement = conn.createStatement()) {
statement.execute(
"UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM inventory.products WHERE id=111;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
// assert final results
String mysqlUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
JdbcProxy proxy =
new JdbcProxy(mysqlUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS);
List<String> expectResult =
Arrays.asList(
"101,scooter,Small 2-wheel scooter,3.14",
"102,car battery,12V car battery,8.1",
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8",
"104,hammer,12oz carpenter's hammer,0.75",
"105,hammer,14oz carpenter's hammer,0.875",
"106,hammer,18oz carpenter hammer,1.0",
"107,rocks,box of assorted rocks,5.1",
"108,jacket,water resistent black wind breaker,0.1",
"109,spare tire,24 inch spare tire,22.2",
"110,jacket,new water resistent white wind breaker,0.5");
proxy.checkResultWithTimeout(
expectResult,
"products_sink",
new String[] {"id", "name", "description", "weight"},
60000L);
}
/**
* Executes a JDBC statement using the default jdbc config without autocommitting the
* connection.
*/
private void initializePostgresTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile = PostgresE2eITCase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try {
Class.forName(PG_DRIVER_CLASS);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
try (Connection connection = getPgJdbcConnection();
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Connection getPgJdbcConnection() throws SQLException {
return DriverManager.getConnection(
POSTGRES.getJdbcUrl(), POSTGRES.getUsername(), POSTGRES.getPassword());
}
}

@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests.utils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.MountableFile;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkState;
/** Test environment running job on Flink containers. */
public abstract class FlinkContainerTestEnvironment extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnvironment.class);
// ------------------------------------------------------------------------------------------
// Flink Variables
// ------------------------------------------------------------------------------------------
private static final int JOB_MANAGER_REST_PORT = 8081;
private static final String FLINK_BIN = "bin";
private static final String FLINK_IMAGE_TAG = "flink:1.13.3-scala_2.11";
private static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
private static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
private static final String FLINK_PROPERTIES =
String.join(
"\n",
Arrays.asList(
"jobmanager.rpc.address: jobmanager",
"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
// this is needed for oracle-cdc tests.
// see https://stackoverflow.com/a/47062742/4915129
"env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the sink for easier verifying)
// ------------------------------------------------------------------------------------------
protected static final String MYSQL_TEST_USER = "mysqluser";
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
@ClassRule public static final Network NETWORK = Network.newNetwork();
@ClassRule
public static final MySqlContainer MYSQL =
(MySqlContainer)
new MySqlContainer()
.withConfigurationOverride("docker/mysql/my.cnf")
.withSetupSQL("docker/mysql/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public final GenericContainer<?> jobmanager =
new GenericContainer<>(FLINK_IMAGE_TAG)
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Rule
public final GenericContainer<?> taskmanager =
new GenericContainer<>(FLINK_IMAGE_TAG)
.withCommand("taskmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.dependsOn(jobmanager)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Nullable private RestClusterClient<StandaloneClusterId> restClusterClient;
protected final UniqueDatabase mysqlInventoryDatabase =
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
@Before
public void before() {
mysqlInventoryDatabase.createAndInitialize();
}
@After
public void after() {
if (restClusterClient != null) {
restClusterClient.close();
}
}
/**
* Submits a SQL job to the running cluster.
*
* <p><b>NOTE:</b> You should not use {@code '\t'}.
*/
public void submitSQLJob(List<String> sqlLines, Path... jars)
throws IOException, InterruptedException {
SQLJobSubmission job =
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines).addJars(jars).build();
final List<String> commands = new ArrayList<>();
Path script = temporaryFolder.newFile().toPath();
Files.write(script, job.getSqlLines());
jobmanager.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql");
commands.add("cat /tmp/script.sql | ");
commands.add(FLINK_BIN + "/sql-client.sh");
for (String jar : job.getJars()) {
commands.add("--jar");
String containerPath = copyAndGetContainerPath(jobmanager, jar);
commands.add(containerPath);
}
ExecResult execResult =
jobmanager.execInContainer("bash", "-c", String.join(" ", commands));
LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr());
if (execResult.getExitCode() != 0) {
throw new AssertionError("Failed when submitting the SQL job.");
}
}
/**
* Get {@link RestClusterClient} connected to this FlinkContainer.
*
* <p>This method lazily initializes the REST client on-demand.
*/
public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
if (restClusterClient != null) {
return restClusterClient;
}
checkState(
jobmanager.isRunning(),
"Cluster client should only be retrieved for a running cluster");
try {
final Configuration clientConfiguration = new Configuration();
clientConfiguration.set(RestOptions.ADDRESS, jobmanager.getHost());
clientConfiguration.set(
RestOptions.PORT, jobmanager.getMappedPort(JOB_MANAGER_REST_PORT));
this.restClusterClient =
new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance());
} catch (Exception e) {
throw new IllegalStateException(
"Failed to create client for Flink container cluster", e);
}
return restClusterClient;
}
public void waitUntilJobRunning(Duration timeout) {
RestClusterClient<?> clusterClient = getRestClusterClient();
Deadline deadline = Deadline.fromNow(timeout);
while (deadline.hasTimeLeft()) {
Collection<JobStatusMessage> jobStatusMessages;
try {
jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Error when fetching job status.", e);
continue;
}
if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
JobStatusMessage message = jobStatusMessages.iterator().next();
JobStatus jobStatus = message.getJobState();
if (jobStatus.isTerminalState()) {
throw new ValidationException(
String.format(
"Job has been terminated! JobName: %s, JobID: %s, Status: %s",
message.getJobName(),
message.getJobId(),
message.getJobState()));
} else if (jobStatus == JobStatus.RUNNING) {
return;
}
}
}
}
private String copyAndGetContainerPath(GenericContainer<?> container, String filePath) {
Path path = Paths.get(filePath);
String containerPath = "/tmp/" + path.getFileName();
container.copyFileToContainer(MountableFile.forHostPath(path), containerPath);
return containerPath;
}
}

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests.utils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
/** Proxy to communicate with database using JDBC protocol. */
public class JdbcProxy {
private final String url;
private final String userName;
private final String password;
private final String driverClass;
public JdbcProxy(String url, String userName, String password, String driverClass) {
this.url = url;
this.userName = userName;
this.password = password;
this.driverClass = driverClass;
}
public void checkResult(List<String> expectedResult, String table, String[] fields)
throws SQLException, ClassNotFoundException {
Class.forName(driverClass);
try (Connection dbConn = DriverManager.getConnection(url, userName, password);
PreparedStatement statement = dbConn.prepareStatement("select * from " + table);
ResultSet resultSet = statement.executeQuery()) {
List<String> results = new ArrayList<>();
while (resultSet.next()) {
List<String> result = new ArrayList<>();
for (String field : fields) {
Object value = resultSet.getObject(field);
if (value == null) {
result.add("null");
} else {
result.add(value.toString());
}
}
results.add(StringUtils.join(result, ","));
}
Collections.sort(results);
Collections.sort(expectedResult);
assertEquals(expectedResult, results);
}
}
public void checkResultWithTimeout(
List<String> expectedResult, String table, String[] fields, long timeout)
throws Exception {
long endTimeout = System.currentTimeMillis() + timeout;
boolean result = false;
while (System.currentTimeMillis() < endTimeout) {
try {
checkResult(expectedResult, table, fields);
result = true;
break;
} catch (AssertionError | SQLException throwable) {
Thread.sleep(1000L);
}
}
if (!result) {
checkResult(expectedResult, table, fields);
}
}
}

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests.utils;
import java.util.function.Function;
/** System-property based parameters for tests and resources. */
public class ParameterProperty<V> {
private final String propertyName;
private final Function<String, V> converter;
public ParameterProperty(final String propertyName, final Function<String, V> converter) {
this.propertyName = propertyName;
this.converter = converter;
}
/**
* Retrieves the value of this property, or the given default if no value was set.
*
* @return the value of this property, or the given default if no value was set
*/
public V get(final V defaultValue) {
final String value = System.getProperty(propertyName);
return value == null ? defaultValue : converter.apply(value);
}
}

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests.utils;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Programmatic definition of a SQL job-submission. */
public class SQLJobSubmission {
private final List<String> sqlLines;
private final List<String> jars;
private SQLJobSubmission(List<String> sqlLines, List<String> jars) {
this.sqlLines = checkNotNull(sqlLines);
this.jars = checkNotNull(jars);
}
public List<String> getJars() {
return this.jars;
}
public List<String> getSqlLines() {
return this.sqlLines;
}
/** Builder for the {@link SQLJobSubmission}. */
public static class SQLJobSubmissionBuilder {
private final List<String> sqlLines;
private final List<String> jars = new ArrayList<>();
public SQLJobSubmissionBuilder(List<String> sqlLines) {
this.sqlLines = sqlLines;
}
public SQLJobSubmissionBuilder addJar(Path jarFile) {
this.jars.add(jarFile.toAbsolutePath().toString());
return this;
}
public SQLJobSubmissionBuilder addJars(Path... jarFiles) {
for (Path jarFile : jarFiles) {
addJar(jarFile);
}
return this;
}
public SQLJobSubmission build() {
return new SQLJobSubmission(sqlLines, jars);
}
}
}

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests.utils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** General test utilities. */
public enum TestUtils {
;
private static final ParameterProperty<Path> MODULE_DIRECTORY =
new ParameterProperty<>("moduleDir", Paths::get);
/**
* Searches for a resource file matching the given regex in the given directory. This method is
* primarily intended to be used for the initialization of static {@link Path} fields for
* resource file(i.e. jar, config file) that reside in the modules {@code target} directory.
*
* @param resourceNameRegex regex pattern to match against
* @return Path pointing to the matching jar
* @throws RuntimeException if none or multiple resource files could be found
*/
public static Path getResource(final String resourceNameRegex) {
// if the property is not set then we are most likely running in the IDE, where the working
// directory is the
// module of the test that is currently running, which is exactly what we want
Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());
try (Stream<Path> dependencyResources = Files.walk(moduleDirectory)) {
final List<Path> matchingResources =
dependencyResources
.filter(
jar ->
Pattern.compile(resourceNameRegex)
.matcher(jar.toAbsolutePath().toString())
.find())
.collect(Collectors.toList());
switch (matchingResources.size()) {
case 0:
throw new RuntimeException(
new FileNotFoundException(
String.format(
"No resource file could be found that matches the pattern %s. "
+ "This could mean that the test module must be rebuilt via maven.",
resourceNameRegex)));
case 1:
return matchingResources.get(0);
default:
throw new RuntimeException(
new IOException(
String.format(
"Multiple resource files were found matching the pattern %s. Matches=%s",
resourceNameRegex, matchingResources)));
}
} catch (final IOException ioe) {
throw new RuntimeException("Could not search for resource resource files.", ioe);
}
}
}

@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
db.getCollection('products').insertMany([
{
"_id": ObjectId("100000000000000000000101"),
"name": "scooter",
"description": "Small 2-wheel scooter",
"weight": 3.14
},
{
"_id": ObjectId("100000000000000000000102"),
"name": "car battery",
"description": "12V car battery",
"weight": 8.1
},
{
"_id": ObjectId("100000000000000000000103"),
"name": "12-pack drill bits",
"description": "12-pack of drill bits with sizes ranging from #40 to #3",
"weight": 0.8
},
{
"_id": ObjectId("100000000000000000000104"),
"name": "hammer",
"description": "12oz carpenter's hammer",
"weight": 0.75
},
{
"_id": ObjectId("100000000000000000000105"),
"name": "hammer",
"description": "14oz carpenter's hammer",
"weight": 0.875
},
{
"_id": ObjectId("100000000000000000000106"),
"name": "hammer",
"description": "12oz carpenter's hammer",
"weight": 1.0
},
{
"_id": ObjectId("100000000000000000000107"),
"name": "rocks",
"description": "box of assorted rocks",
"weight": 5.3
},
{
"_id": ObjectId("100000000000000000000108"),
"name": "jacket",
"description": "water resistent black wind breaker",
"weight": 0.1
},
{
"_id": ObjectId("100000000000000000000109"),
"name": "spare tire",
"description": "24 inch spare tire",
"weight": 22.2
}
]);

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// In production you would almost certainly limit the replication user must be on the follower (slave) machine,
// to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
// However, in this database we'll grant 2 users different privileges:
//
// 1) 'flinkuser' - all privileges required by the snapshot reader AND oplog reader (used for testing)
// 2) 'superuser' - all privileges
//
//use admin;
if (db.system.users.find({user:'superuser'}).count() == 0) {
db.createUser(
{
user: 'superuser',
pwd: 'superpw',
roles: [ { role: 'root', db: 'admin' } ]
}
);
}
if (db.system.users.find({user:'flinkuser'}).count() == 0) {
db.createUser(
{
user: 'flinkuser',
pwd: 'a1?~!@#$%^&*(){}[]<>.,+_-=/|:;',
roles: [
{ role: 'read', db: 'admin' },
{ role: 'readAnyDatabase', db: 'admin' }
]
}
);
}
rs.status()

@ -0,0 +1,59 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: mysql_inventory
-- ----------------------------------------------------------------------------------------------------------------
-- Create and populate our products using a single insert with many rows
CREATE TABLE products_source (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT,
enum_c enum('red', 'white') default 'red', -- test some complex types as well,
json_c JSON, -- because we use additional dependencies to deserialize complex types.
point_c POINT
);
ALTER TABLE products_source AUTO_INCREMENT = 101;
INSERT INTO products_source
VALUES (default,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1": "value1"}', ST_GeomFromText('POINT(1 1)')),
(default,"car battery","12V car battery",8.1, 'white', '{"key2": "value2"}', ST_GeomFromText('POINT(2 2)')),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3 3)')),
(default,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4": "value4"}', ST_GeomFromText('POINT(4 4)')),
(default,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1", "k2": "v2"}', ST_GeomFromText('POINT(5 5)')),
(default,"hammer","16oz carpenter's hammer",1.0, null, null, null),
(default,"rocks","box of assorted rocks",5.3, null, null, null),
(default,"jacket","water resistent black wind breaker",0.1, null, null, null),
(default,"spare tire","24 inch spare tire",22.2, null, null, null);
CREATE TABLE products_sink (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT,
enum_c VARCHAR(255),
json_c VARCHAR(255),
point_c VARCHAR(255)
);
CREATE TABLE mongodb_products_sink (
id VARCHAR (255) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT
);

@ -0,0 +1,40 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- Create the schema that we'll use to populate data and watch the effect in the binlog
DROP SCHEMA IF EXISTS inventory CASCADE;
CREATE SCHEMA inventory;
SET search_path TO inventory;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id SERIAL NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT
);
ALTER SEQUENCE products_id_seq RESTART WITH 101;
ALTER TABLE products REPLICA IDENTITY FULL;
INSERT INTO products
VALUES (default,'scooter','Small 2-wheel scooter',3.14),
(default,'car battery','12V car battery',8.1),
(default,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
(default,'hammer','12oz carpenter''s hammer',0.75),
(default,'hammer','14oz carpenter''s hammer',0.875),
(default,'hammer','16oz carpenter''s hammer',1.0),
(default,'rocks','box of assorted rocks',5.3),
(default,'jacket','water resistent black wind breaker',0.1),
(default,'spare tire','24 inch spare tire',22.2);

@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
#secure-file-priv=/var/lib/mysql-files
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
#log-error=/var/log/mysqld.log
#pid-file=/var/run/mysqld/mysqld.pid
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on

@ -0,0 +1,30 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
-- However, in this database we'll grant 2 users different privileges:
--
-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
-- 2) 'mysqluser' - all privileges
--
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: emptydb
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE emptydb;

@ -489,7 +489,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
public void cancel() {
// safely and gracefully stop the engine
shutdownEngine();
debeziumChangeFetcher.close();
if (debeziumChangeFetcher != null) {
debeziumChangeFetcher.close();
}
}
@Override

@ -38,7 +38,7 @@ public class MongoDBContainer extends GenericContainer<MongoDBContainer> {
private static final String DOCKER_IMAGE_NAME = "mongo:5.0.2";
private static final int MONGODB_PORT = 27017;
public static final int MONGODB_PORT = 27017;
public MongoDBContainer() {
super(DOCKER_IMAGE_NAME);

@ -36,7 +36,6 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.jayway.jsonpath.JsonPath;
import com.ververica.cdc.connectors.oracle.utils.OracleCdcContainer;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
@ -47,6 +46,7 @@ import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
@ -78,7 +78,7 @@ public class OracleSourceTest extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(OracleSourceTest.class);
private OracleCdcContainer oracleContainer =
private OracleContainer oracleContainer =
OracleTestUtils.ORACLE_CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG));
@Before
@ -579,8 +579,7 @@ public class OracleSourceTest extends AbstractTestBase {
return basicSourceBuilder(oracleContainer).build();
}
private OracleSource.Builder<SourceRecord> basicSourceBuilder(
OracleCdcContainer oracleContainer) {
private OracleSource.Builder<SourceRecord> basicSourceBuilder(OracleContainer oracleContainer) {
return OracleSource.<SourceRecord>builder()
.hostname(oracleContainer.getHost())

@ -25,13 +25,13 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import com.ververica.cdc.connectors.oracle.utils.OracleCdcContainer;
import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
@ -54,7 +54,7 @@ public class OracleConnectorITCase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(OracleConnectorITCase.class);
private OracleCdcContainer oracleContainer =
private OracleContainer oracleContainer =
OracleTestUtils.ORACLE_CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG));
private final StreamExecutionEnvironment env =

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.utils;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.images.builder.ImageFromDockerfile;
/**
* Docker container for Oracle. The difference between this class and {@link
* org.testcontainers.containers.OracleContainer} is that TC OracleContainer has problems when using
* base docker image "wnameless/oracle-xe-11g-r2" (can't find log output "DATABASE IS READY TO
* USE!").
*/
public class OracleCdcContainer extends JdbcDatabaseContainer<OracleCdcContainer> {
public static final String NAME = "oracle";
private static final ImageFromDockerfile ORACLE_IMAGE =
new ImageFromDockerfile("oracle-xe-11g-tmp")
.withFileFromClasspath(".", "docker")
.withFileFromClasspath(
"assets/activate-archivelog.sh", "docker/assets/activate-archivelog.sh")
.withFileFromClasspath(
"assets/activate-archivelog.sql",
"docker/assets/activate-archivelog.sql");
private static final int ORACLE_PORT = 1521;
private static final int APEX_HTTP_PORT = 8080;
private static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 120;
private String username = "system";
private String password = "oracle";
public OracleCdcContainer() {
super(ORACLE_IMAGE);
preconfigure();
}
private void preconfigure() {
withStartupTimeoutSeconds(DEFAULT_STARTUP_TIMEOUT_SECONDS);
withConnectTimeoutSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS);
addExposedPorts(ORACLE_PORT, APEX_HTTP_PORT);
}
@Override
protected Integer getLivenessCheckPort() {
return getMappedPort(ORACLE_PORT);
}
@Override
public String getDriverClassName() {
return "oracle.jdbc.OracleDriver";
}
@Override
public String getJdbcUrl() {
return "jdbc:oracle:thin:"
+ getUsername()
+ "/"
+ getPassword()
+ "@"
+ getHost()
+ ":"
+ getOraclePort()
+ ":"
+ getSid();
}
@Override
public String getUsername() {
return username;
}
@Override
public String getPassword() {
return password;
}
@Override
public OracleCdcContainer withUsername(String username) {
this.username = username;
return self();
}
@Override
public OracleCdcContainer withPassword(String password) {
this.password = password;
return self();
}
@Override
public OracleCdcContainer withUrlParam(String paramName, String paramValue) {
throw new UnsupportedOperationException("The OracleDb does not support this");
}
@SuppressWarnings("SameReturnValue")
public String getSid() {
return "xe";
}
public Integer getOraclePort() {
return getMappedPort(ORACLE_PORT);
}
@SuppressWarnings("unused")
public Integer getWebPort() {
return getMappedPort(APEX_HTTP_PORT);
}
@Override
public String getTestQueryString() {
return "SELECT 1 FROM DUAL";
}
}

@ -18,19 +18,31 @@
package com.ververica.cdc.connectors.oracle.utils;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.images.builder.ImageFromDockerfile;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/** Utility class for oracle tests. */
public class OracleTestUtils {
public static final OracleCdcContainer ORACLE_CONTAINER = new OracleCdcContainer();
public static final OracleContainer ORACLE_CONTAINER =
new OracleContainer(
new ImageFromDockerfile("oracle-xe-11g-tmp")
.withFileFromClasspath(".", "docker")
.withFileFromClasspath(
"assets/activate-archivelog.sh",
"docker/assets/activate-archivelog.sh")
.withFileFromClasspath(
"assets/activate-archivelog.sql",
"docker/assets/activate-archivelog.sql"));
public static final String ORACLE_USER = "dbzuser";
public static final String ORACLE_PWD = "dbz";
public static Connection getJdbcConnection(OracleCdcContainer oracleContainer)
public static Connection getJdbcConnection(OracleContainer oracleContainer)
throws SQLException {
return DriverManager.getConnection(oracleContainer.getJdbcUrl(), ORACLE_USER, ORACLE_PWD);
}

@ -44,6 +44,7 @@ under the License.
<module>flink-sql-connector-mongodb-cdc</module>
<module>flink-sql-connector-oracle-cdc</module>
<module>flink-format-changelog-json</module>
<module>flink-cdc-e2e-tests</module>
</modules>
<licenses>
@ -69,7 +70,11 @@ under the License.
<flink.version>1.13.3</flink.version>
<debezium.version>1.5.4.Final</debezium.version>
<geometry.version>2.2.0</geometry.version>
<testcontainers.version>1.16.2</testcontainers.version>
<!-- OracleE2eITCase will report "container cannot be accessed" error when running in Azure Pipeline with 1.16.1 testconainters.
This might be a conflicts with "wnameless/oracle-xe-11g-r2" and 1.16 testcontainers.
We may need to upgrade our Oracle base image to "gvenzl/oracle-xe" which is the default image of 1.16 testcontainers.
See more https://github.com/testcontainers/testcontainers-java/issues/4297. -->
<testcontainers.version>1.15.3</testcontainers.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>

@ -82,6 +82,8 @@ jobs:
module: oracle
mongodb:
module: mongodb
e2e:
module: e2e
misc:
module: misc
steps:

@ -20,6 +20,7 @@ STAGE_MYSQL="mysql"
STAGE_POSTGRES="postgres"
STAGE_ORACLE="oracle"
STAGE_MONGODB="mongodb"
STAGE_E2E="e2e"
STAGE_MISC="misc"
MODULES_MYSQL="\
@ -38,6 +39,9 @@ MODULES_MONGODB="\
flink-connector-mongodb-cdc,\
flink-sql-connector-mongodb-cdc"
MODULES_E2E="\
flink-cdc-e2e-tests"
function get_compile_modules_for_stage() {
local stage=$1
@ -54,6 +58,11 @@ function get_compile_modules_for_stage() {
(${STAGE_MONGODB})
echo "-pl $MODULES_MONGODB -am"
;;
(${STAGE_E2E})
# compile everything; using the -am switch does not work with negated module lists!
# the negation takes precedence, thus not all required modules would be built
echo ""
;;
(${STAGE_MISC})
# compile everything; using the -am switch does not work with negated module lists!
# the negation takes precedence, thus not all required modules would be built
@ -69,11 +78,13 @@ function get_test_modules_for_stage() {
local modules_postgres=$MODULES_POSTGRES
local modules_oracle=$MODULES_ORACLE
local modules_mongodb=$MODULES_MONGODB
local modules_e2e=$MODULES_E2E
local negated_mysql=\!${MODULES_MYSQL//,/,\!}
local negated_postgres=\!${MODULES_POSTGRES//,/,\!}
local negated_oracle=\!${MODULES_ORACLE//,/,\!}
local negated_mongodb=\!${MODULES_MONGODB//,/,\!}
local modules_misc="$negated_mysql,$negated_postgres,$negated_oracle,$negated_mongodb"
local negated_e2e=\!${MODULES_E2E//,/,\!}
local modules_misc="$negated_mysql,$negated_postgres,$negated_oracle,$negated_mongodb,$negated_e2e"
case ${stage} in
(${STAGE_MYSQL})
@ -88,6 +99,9 @@ function get_test_modules_for_stage() {
(${STAGE_MONGODB})
echo "-pl $modules_mongodb"
;;
(${STAGE_E2E})
echo "-pl $modules_e2e"
;;
(${STAGE_MISC})
echo "-pl $modules_misc"
;;

Loading…
Cancel
Save