[sqlserver] Add e2e tests for sqlserver cdc connector (#837)

pull/857/head
gongzhongqiang 3 years ago committed by GitHub
parent e70e0f1b68
commit b27d7f96fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -76,6 +76,13 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-sqlserver-cdc</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-test-util</artifactId>
@ -102,6 +109,12 @@ under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mssqlserver</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -218,6 +231,16 @@ under the License.
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${project.version}</version>
<destFileName>sqlserver-cdc-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</plugin>

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.tests;
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
import com.ververica.cdc.connectors.tests.utils.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
/** End-to-end tests for sqlserver-cdc connector uber jar. */
public class SqlServerE2eITCase extends FlinkContainerTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(SqlServerE2eITCase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private static final String INTER_CONTAINER_SQL_SERVER_ALIAS = "mssqlserver";
private static final Path sqlServerCdcJar =
TestUtils.getResource("sqlserver-cdc-connector.jar");
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ClassRule
public static final MSSQLServerContainer MSSQL_SERVER_CONTAINER =
new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest")
.withPassword("Password!")
.withEnv("MSSQL_AGENT_ENABLED", "true")
.withEnv("MSSQL_PID", "Standard")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_SQL_SERVER_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Before
public void before() {
super.before();
initializeSqlServerTable("sqlserver_inventory");
}
@Test
public void testSqlServerCDC() throws Exception {
List<String> sqlLines =
Arrays.asList(
"CREATE TABLE products_source (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'sqlserver-cdc',",
" 'hostname' = '" + INTER_CONTAINER_SQL_SERVER_ALIAS + "',",
" 'port' = '" + MSSQL_SERVER_CONTAINER.MS_SQL_SERVER_PORT + "',",
" 'username' = '" + MSSQL_SERVER_CONTAINER.getUsername() + "',",
" 'password' = '" + MSSQL_SERVER_CONTAINER.getPassword() + "',",
" 'database-name' = 'inventory',",
" 'schema-name' = 'dbo',",
" 'table-name' = 'products'",
");",
"CREATE TABLE products_sink (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'jdbc',",
String.format(
" 'url' = 'jdbc:mysql://%s:3306/%s',",
INTER_CONTAINER_MYSQL_ALIAS,
mysqlInventoryDatabase.getDatabaseName()),
" 'table-name' = 'products_sink',",
" 'username' = '" + MYSQL_TEST_USER + "',",
" 'password' = '" + MYSQL_TEST_PASSWORD + "'",
");",
"INSERT INTO products_sink",
"SELECT * FROM products_source;");
submitSQLJob(sqlLines, sqlServerCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
// generate binlogs
try (Connection conn = getSqlServerJdbcConnection();
Statement statement = conn.createStatement()) {
statement.execute(
"UPDATE inventory.dbo.products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE inventory.dbo.products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE inventory.dbo.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE inventory.dbo.products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM inventory.dbo.products WHERE id=111;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
// assert final results
String mysqlUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
JdbcProxy proxy =
new JdbcProxy(mysqlUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS);
List<String> expectResult =
Arrays.asList(
"101,scooter,Small 2-wheel scooter,3.14",
"102,car battery,12V car battery,8.1",
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8",
"104,hammer,12oz carpenter's hammer,0.75",
"105,hammer,14oz carpenter's hammer,0.875",
"106,hammer,18oz carpenter hammer,1.0",
"107,rocks,box of assorted rocks,5.1",
"108,jacket,water resistent black wind breaker,0.1",
"109,spare tire,24 inch spare tire,22.2",
"110,jacket,new water resistent white wind breaker,0.5");
proxy.checkResultWithTimeout(
expectResult,
"products_sink",
new String[] {"id", "name", "description", "weight"},
60000L);
}
private void initializeSqlServerTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile = SqlServerE2eITCase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getSqlServerJdbcConnection();
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Connection getSqlServerJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MSSQL_SERVER_CONTAINER.getJdbcUrl(),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword());
}
}

@ -0,0 +1,50 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
-- Create the inventory database
CREATE DATABASE inventory;
USE inventory;
EXEC sys.sp_cdc_enable_db;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
INSERT INTO products(name,description,weight)
VALUES ('scooter','Small 2-wheel scooter',3.14);
INSERT INTO products(name,description,weight)
VALUES ('car battery','12V car battery',8.1);
INSERT INTO products(name,description,weight)
VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO products(name,description,weight)
VALUES ('hammer','12oz carpenter''s hammer',0.75);
INSERT INTO products(name,description,weight)
VALUES ('hammer','14oz carpenter''s hammer',0.875);
INSERT INTO products(name,description,weight)
VALUES ('hammer','16oz carpenter''s hammer',1.0);
INSERT INTO products(name,description,weight)
VALUES ('rocks','box of assorted rocks',5.3);
INSERT INTO products(name,description,weight)
VALUES ('jacket','water resistent black wind breaker',0.1);
INSERT INTO products(name,description,weight)
VALUES ('spare tire','24 inch spare tire',22.2);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;
Loading…
Cancel
Save