From b27d7f96fd147a6bcc0ae81523ab707edbd481ba Mon Sep 17 00:00:00 2001
From: gongzhongqiang <764629910@qq.com>
Date: Sun, 13 Feb 2022 08:36:31 +0800
Subject: [PATCH] [sqlserver] Add e2e tests for sqlserver cdc connector (#837)
---
flink-cdc-e2e-tests/pom.xml | 23 ++
.../connectors/tests/SqlServerE2eITCase.java | 200 ++++++++++++++++++
.../container-license-acceptance.txt | 1 +
.../resources/ddl/sqlserver_inventory.sql | 50 +++++
4 files changed, 274 insertions(+)
create mode 100644 flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/SqlServerE2eITCase.java
create mode 100644 flink-cdc-e2e-tests/src/test/resources/container-license-acceptance.txt
create mode 100644 flink-cdc-e2e-tests/src/test/resources/ddl/sqlserver_inventory.sql
diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml
index f1186e9b3..1ff13c070 100644
--- a/flink-cdc-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/pom.xml
@@ -76,6 +76,13 @@ under the License.
test-jar
test
+
+ com.ververica
+ flink-connector-sqlserver-cdc
+ ${project.version}
+ test-jar
+ test
+
com.ververica
flink-connector-test-util
@@ -102,6 +109,12 @@ under the License.
${testcontainers.version}
test
+
+ org.testcontainers
+ mssqlserver
+ ${testcontainers.version}
+ test
+
@@ -218,6 +231,16 @@ under the License.
${project.build.directory}/dependencies
+
+
+ com.ververica
+ flink-sql-connector-sqlserver-cdc
+ ${project.version}
+ sqlserver-cdc-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/SqlServerE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/SqlServerE2eITCase.java
new file mode 100644
index 000000000..655e72cd2
--- /dev/null
+++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/SqlServerE2eITCase.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.tests;
+
+import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
+import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
+import com.ververica.cdc.connectors.tests.utils.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertNotNull;
+
+/** End-to-end tests for sqlserver-cdc connector uber jar. */
+public class SqlServerE2eITCase extends FlinkContainerTestEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SqlServerE2eITCase.class);
+ private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+ private static final String INTER_CONTAINER_SQL_SERVER_ALIAS = "mssqlserver";
+ private static final Path sqlServerCdcJar =
+ TestUtils.getResource("sqlserver-cdc-connector.jar");
+ private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
+ private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+
+ @ClassRule
+ public static final MSSQLServerContainer MSSQL_SERVER_CONTAINER =
+ new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest")
+ .withPassword("Password!")
+ .withEnv("MSSQL_AGENT_ENABLED", "true")
+ .withEnv("MSSQL_PID", "Standard")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_SQL_SERVER_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ @Before
+ public void before() {
+ super.before();
+ initializeSqlServerTable("sqlserver_inventory");
+ }
+
+ @Test
+ public void testSqlServerCDC() throws Exception {
+ List sqlLines =
+ Arrays.asList(
+ "CREATE TABLE products_source (",
+ " `id` INT NOT NULL,",
+ " name STRING,",
+ " description STRING,",
+ " weight DECIMAL(10,3),",
+ " primary key (`id`) not enforced",
+ ") WITH (",
+ " 'connector' = 'sqlserver-cdc',",
+ " 'hostname' = '" + INTER_CONTAINER_SQL_SERVER_ALIAS + "',",
+ " 'port' = '" + MSSQL_SERVER_CONTAINER.MS_SQL_SERVER_PORT + "',",
+ " 'username' = '" + MSSQL_SERVER_CONTAINER.getUsername() + "',",
+ " 'password' = '" + MSSQL_SERVER_CONTAINER.getPassword() + "',",
+ " 'database-name' = 'inventory',",
+ " 'schema-name' = 'dbo',",
+ " 'table-name' = 'products'",
+ ");",
+ "CREATE TABLE products_sink (",
+ " `id` INT NOT NULL,",
+ " name STRING,",
+ " description STRING,",
+ " weight DECIMAL(10,3),",
+ " primary key (`id`) not enforced",
+ ") WITH (",
+ " 'connector' = 'jdbc',",
+ String.format(
+ " 'url' = 'jdbc:mysql://%s:3306/%s',",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ mysqlInventoryDatabase.getDatabaseName()),
+ " 'table-name' = 'products_sink',",
+ " 'username' = '" + MYSQL_TEST_USER + "',",
+ " 'password' = '" + MYSQL_TEST_PASSWORD + "'",
+ ");",
+ "INSERT INTO products_sink",
+ "SELECT * FROM products_source;");
+
+ submitSQLJob(sqlLines, sqlServerCdcJar, jdbcJar, mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+
+ // generate binlogs
+ try (Connection conn = getSqlServerJdbcConnection();
+ Statement statement = conn.createStatement()) {
+
+ statement.execute(
+ "UPDATE inventory.dbo.products SET description='18oz carpenter hammer' WHERE id=106;");
+ statement.execute("UPDATE inventory.dbo.products SET weight='5.1' WHERE id=107;");
+ statement.execute(
+ "INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('jacket','water resistent white wind breaker',0.2);"); // 110
+ statement.execute(
+ "INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);");
+ statement.execute(
+ "UPDATE inventory.dbo.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
+ statement.execute("UPDATE inventory.dbo.products SET weight='5.17' WHERE id=111;");
+ statement.execute("DELETE FROM inventory.dbo.products WHERE id=111;");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ // assert final results
+ String mysqlUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(),
+ MYSQL.getDatabasePort(),
+ mysqlInventoryDatabase.getDatabaseName());
+ JdbcProxy proxy =
+ new JdbcProxy(mysqlUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS);
+ List expectResult =
+ Arrays.asList(
+ "101,scooter,Small 2-wheel scooter,3.14",
+ "102,car battery,12V car battery,8.1",
+ "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8",
+ "104,hammer,12oz carpenter's hammer,0.75",
+ "105,hammer,14oz carpenter's hammer,0.875",
+ "106,hammer,18oz carpenter hammer,1.0",
+ "107,rocks,box of assorted rocks,5.1",
+ "108,jacket,water resistent black wind breaker,0.1",
+ "109,spare tire,24 inch spare tire,22.2",
+ "110,jacket,new water resistent white wind breaker,0.5");
+ proxy.checkResultWithTimeout(
+ expectResult,
+ "products_sink",
+ new String[] {"id", "name", "description", "weight"},
+ 60000L);
+ }
+
+ private void initializeSqlServerTable(String sqlFile) {
+ final String ddlFile = String.format("ddl/%s.sql", sqlFile);
+ final URL ddlTestFile = SqlServerE2eITCase.class.getClassLoader().getResource(ddlFile);
+ assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
+ try (Connection connection = getSqlServerJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ final List statements =
+ Arrays.stream(
+ Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
+ .map(String::trim)
+ .filter(x -> !x.startsWith("--") && !x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+ COMMENT_PATTERN.matcher(x);
+ return m.matches() ? m.group(1) : x;
+ })
+ .collect(Collectors.joining("\n"))
+ .split(";"))
+ .collect(Collectors.toList());
+ for (String stmt : statements) {
+ statement.execute(stmt);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Connection getSqlServerJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ MSSQL_SERVER_CONTAINER.getJdbcUrl(),
+ MSSQL_SERVER_CONTAINER.getUsername(),
+ MSSQL_SERVER_CONTAINER.getPassword());
+ }
+}
diff --git a/flink-cdc-e2e-tests/src/test/resources/container-license-acceptance.txt b/flink-cdc-e2e-tests/src/test/resources/container-license-acceptance.txt
new file mode 100644
index 000000000..56ecd3737
--- /dev/null
+++ b/flink-cdc-e2e-tests/src/test/resources/container-license-acceptance.txt
@@ -0,0 +1 @@
+mcr.microsoft.com/mssql/server:2019-latest
\ No newline at end of file
diff --git a/flink-cdc-e2e-tests/src/test/resources/ddl/sqlserver_inventory.sql b/flink-cdc-e2e-tests/src/test/resources/ddl/sqlserver_inventory.sql
new file mode 100644
index 000000000..8b2015065
--- /dev/null
+++ b/flink-cdc-e2e-tests/src/test/resources/ddl/sqlserver_inventory.sql
@@ -0,0 +1,50 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: inventory
+-- ----------------------------------------------------------------------------------------------------------------
+-- Create the inventory database
+CREATE DATABASE inventory;
+
+USE inventory;
+EXEC sys.sp_cdc_enable_db;
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products (
+ id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
+ name VARCHAR(255) NOT NULL,
+ description VARCHAR(512),
+ weight FLOAT
+);
+INSERT INTO products(name,description,weight)
+VALUES ('scooter','Small 2-wheel scooter',3.14);
+INSERT INTO products(name,description,weight)
+VALUES ('car battery','12V car battery',8.1);
+INSERT INTO products(name,description,weight)
+VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
+INSERT INTO products(name,description,weight)
+VALUES ('hammer','12oz carpenter''s hammer',0.75);
+INSERT INTO products(name,description,weight)
+VALUES ('hammer','14oz carpenter''s hammer',0.875);
+INSERT INTO products(name,description,weight)
+VALUES ('hammer','16oz carpenter''s hammer',1.0);
+INSERT INTO products(name,description,weight)
+VALUES ('rocks','box of assorted rocks',5.3);
+INSERT INTO products(name,description,weight)
+VALUES ('jacket','water resistent black wind breaker',0.1);
+INSERT INTO products(name,description,weight)
+VALUES ('spare tire','24 inch spare tire',22.2);
+EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;