values) throws Exception {
+ if (values != null) {
+ values.forEach(
+ v -> Preconditions.checkNotNull(v, "You cannot add null to a ListState."));
+
+ list.addAll(values);
+ }
+ }
+ }
+}
diff --git a/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2TestBase.java b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2TestBase.java
new file mode 100644
index 000000000..8cc38ef5d
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2TestBase.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2022 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.db2;
+
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Db2Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertNotNull;
+
+/** Basic class for testing DB2 source, this contains a DB2 container which enables binlog. */
+public class Db2TestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Db2TestBase.class);
+
+ private static final DockerImageName DEBEZIUM_DOCKER_IMAGE_NAME =
+ DockerImageName.parse(
+ new ImageFromDockerfile("custom/db2-cdc:1.4")
+ .withDockerfile(getFilePath("db2_server/Dockerfile"))
+ .get())
+ .asCompatibleSubstituteFor("ibmcom/db2");
+ private static final Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(LOG);
+ private static boolean db2AsnAgentRunning = false;
+
+ protected static final Db2Container DB2_CONTAINER =
+ new Db2Container(DEBEZIUM_DOCKER_IMAGE_NAME)
+ .withDatabaseName("testdb")
+ .withUsername("db2inst1")
+ .withPassword("flinkpw")
+ .withEnv("AUTOCONFIG", "false")
+ .withEnv("ARCHIVE_LOGS", "true")
+ .acceptLicense()
+ .withLogConsumer(logConsumer)
+ .withLogConsumer(
+ outputFrame -> {
+ if (outputFrame
+ .getUtf8String()
+ .contains("The asncdc program enable finished")) {
+ db2AsnAgentRunning = true;
+ }
+ });
+
+ @BeforeClass
+ public static void startContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(DB2_CONTAINER)).join();
+ LOG.info("Containers are started.");
+
+ LOG.info("Waiting db2 asn agent start...");
+ while (!db2AsnAgentRunning) {
+ try {
+ Thread.sleep(5000L);
+ } catch (InterruptedException e) {
+ LOG.error("unexpected interrupted exception", e);
+ }
+ }
+ LOG.info("Db2 asn agent are started.");
+ }
+
+ protected Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ DB2_CONTAINER.getJdbcUrl(),
+ DB2_CONTAINER.getUsername(),
+ DB2_CONTAINER.getPassword());
+ }
+
+ private static Path getFilePath(String resourceFilePath) {
+ Path path = null;
+ try {
+ URL filePath = Db2TestBase.class.getClassLoader().getResource(resourceFilePath);
+ assertNotNull("Cannot locate " + resourceFilePath, filePath);
+ path = Paths.get(filePath.toURI());
+ } catch (URISyntaxException e) {
+ LOG.error("Cannot get path from URI.", e);
+ }
+ return path;
+ }
+}
diff --git a/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2ConnectorITCase.java b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2ConnectorITCase.java
new file mode 100644
index 000000000..f8120584e
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2ConnectorITCase.java
@@ -0,0 +1,448 @@
+/*
+ * Copyright 2022 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.db2.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.utils.LegacyRowResource;
+
+import com.ververica.cdc.connectors.db2.Db2TestBase;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.testcontainers.containers.Db2Container.DB2_PORT;
+
+/** Integration tests for DB2 CDC source. */
+public class Db2ConnectorITCase extends Db2TestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(Db2ConnectorITCase.class);
+
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ private final StreamTableEnvironment tEnv =
+ StreamTableEnvironment.create(
+ env, EnvironmentSettings.newInstance().inStreamingMode().build());
+
+ @ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
+
+ @Before
+ public void before() {
+ TestValuesTableFactory.clearAllData();
+ env.setParallelism(1);
+ }
+
+ @Test
+ public void testConsumingAllEvents()
+ throws SQLException, InterruptedException, ExecutionException {
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE debezium_source ("
+ + " ID INT NOT NULL,"
+ + " NAME STRING,"
+ + " DESCRIPTION STRING,"
+ + " WEIGHT DECIMAL(10,3)"
+ + ") WITH ("
+ + " 'connector' = 'db2-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s'"
+ + ")",
+ DB2_CONTAINER.getHost(),
+ DB2_CONTAINER.getMappedPort(DB2_PORT),
+ DB2_CONTAINER.getUsername(),
+ DB2_CONTAINER.getPassword(),
+ DB2_CONTAINER.getDatabaseName(),
+ "DB2INST1",
+ "PRODUCTS");
+ String sinkDDL =
+ "CREATE TABLE sink ("
+ + " name STRING,"
+ + " weightSum DECIMAL(10,3),"
+ + " PRIMARY KEY (name) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false',"
+ + " 'sink-expected-messages-num' = '20'"
+ + ")";
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult result =
+ tEnv.executeSql(
+ "INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
+
+ waitForSnapshotStarted("sink");
+
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(
+ "UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
+ statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.1' WHERE ID=107;");
+ statement.execute(
+ "INSERT INTO DB2INST1.PRODUCTS VALUES (110,'jacket','water resistent white wind breaker',0.2);");
+ statement.execute(
+ "INSERT INTO DB2INST1.PRODUCTS VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
+ statement.execute(
+ "UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
+ statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.17' WHERE ID=111;");
+ statement.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=111;");
+ }
+
+ waitForSinkSize("sink", 20);
+
+ /*
+ *
+ * The final database table looks like this:
+ *
+ * > SELECT * FROM DB2INST1.PRODUCTS;
+ * +-----+--------------------+---------------------------------------------------------+--------+
+ * | ID | NAME | DESCRIPTION | WEIGHT |
+ * +-----+--------------------+---------------------------------------------------------+--------+
+ * | 101 | scooter | Small 2-wheel scooter | 3.14 |
+ * | 102 | car battery | 12V car battery | 8.1 |
+ * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
+ * | 104 | hammer | 12oz carpenter's hammer | 0.75 |
+ * | 105 | hammer | 14oz carpenter's hammer | 0.875 |
+ * | 106 | hammer | 18oz carpenter hammer | 1 |
+ * | 107 | rocks | box of assorted rocks | 5.1 |
+ * | 108 | jacket | water resistent black wind breaker | 0.1 |
+ * | 109 | spare tire | 24 inch spare tire | 22.2 |
+ * | 110 | jacket | new water resistent white wind breaker | 0.5 |
+ * +-----+--------------------+---------------------------------------------------------+--------+
+ *
+ */
+
+ String[] expected =
+ new String[] {
+ "scooter,3.140",
+ "car battery,8.100",
+ "12-pack drill bits,0.800",
+ "hammer,2.625",
+ "rocks,5.100",
+ "jacket,0.600",
+ "spare tire,22.200"
+ };
+
+ List actual = TestValuesTableFactory.getResults("sink");
+ assertThat(actual, containsInAnyOrder(expected));
+
+ result.getJobClient().get().cancel().get();
+ }
+
+ @Test
+ public void testAllTypes() throws Exception {
+ // NOTE: db2 is not case sensitive by default, the schema returned by debezium
+ // is uppercase, thus we need use uppercase when defines a db2 table.
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE full_types (\n"
+ + " ID INTEGER NOT NULL,\n"
+ // Debezium cannot track db2 boolean type, see:
+ // https://issues.redhat.com/browse/DBZ-2587
+ // + " BOOLEAN_C BOOLEAN NOT NULL,\n"
+ + " SMALL_C SMALLINT,\n"
+ + " INT_C INTEGER,\n"
+ + " BIG_C BIGINT,\n"
+ + " REAL_C FLOAT,\n"
+ + " DOUBLE_C DOUBLE,\n"
+ + " NUMERIC_C DECIMAL(10, 5),\n"
+ + " DECIMAL_C DECIMAL(10, 1),\n"
+ + " VARCHAR_C STRING,\n"
+ + " CHAR_C STRING,\n"
+ + " CHARACTER_C STRING,\n"
+ + " TIMESTAMP_C TIMESTAMP(3),\n"
+ + " DATE_C DATE,\n"
+ + " TIME_C TIME(0),\n"
+ + " DEFAULT_NUMERIC_C DECIMAL,\n"
+ + " TIMESTAMP_PRECISION_C TIMESTAMP(9)\n"
+ + ") WITH ("
+ + " 'connector' = 'db2-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s'"
+ + ")",
+ DB2_CONTAINER.getHost(),
+ DB2_CONTAINER.getMappedPort(DB2_PORT),
+ DB2_CONTAINER.getUsername(),
+ DB2_CONTAINER.getPassword(),
+ DB2_CONTAINER.getDatabaseName(),
+ "DB2INST1",
+ "FULL_TYPES");
+ String sinkDDL =
+ "CREATE TABLE sink (\n"
+ + " id INTEGER NOT NULL,\n"
+ + " small_c SMALLINT,\n"
+ + " int_c INTEGER,\n"
+ + " big_c BIGINT,\n"
+ + " real_c FLOAT,\n"
+ + " double_c DOUBLE,\n"
+ + " numeric_c DECIMAL(10, 5),\n"
+ + " decimal_c DECIMAL(10, 1),\n"
+ + " varchar_c STRING,\n"
+ + " char_c STRING,\n"
+ + " character_c STRING,\n"
+ + " timestamp_c TIMESTAMP(3),\n"
+ + " date_c DATE,\n"
+ + " time_c TIME(0),\n"
+ + " default_numeric_c DECIMAL,\n"
+ + " timestamp_precision_c TIMESTAMP(9)\n"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")";
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types");
+
+ waitForSnapshotStarted("sink");
+
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("UPDATE DB2INST1.FULL_TYPES SET SMALL_C=0 WHERE ID=1;");
+ }
+
+ waitForSinkSize("sink", 3);
+
+ List expected =
+ Arrays.asList(
+ "+I(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)",
+ "-U(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)",
+ "+U(1,0,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)");
+ List actual = TestValuesTableFactory.getRawResults("sink");
+ assertEquals(expected, actual);
+
+ result.getJobClient().get().cancel().get();
+ }
+
+ @Test
+ public void testStartupFromLatestOffset() throws Exception {
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE debezium_source ("
+ + " ID INT NOT NULL,"
+ + " NAME STRING,"
+ + " DESCRIPTION STRING,"
+ + " WEIGHT DECIMAL(10,3)"
+ + ") WITH ("
+ + " 'connector' = 'db2-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s' ,"
+ + " 'scan.startup.mode' = 'latest-offset'"
+ + ")",
+ DB2_CONTAINER.getHost(),
+ DB2_CONTAINER.getMappedPort(DB2_PORT),
+ DB2_CONTAINER.getUsername(),
+ DB2_CONTAINER.getPassword(),
+ DB2_CONTAINER.getDatabaseName(),
+ "DB2INST1",
+ "PRODUCTS1");
+ String sinkDDL =
+ "CREATE TABLE sink "
+ + " WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ") LIKE debezium_source (EXCLUDING OPTIONS)";
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
+ // wait for the source startup, we don't have a better way to wait it, use sleep for now
+ do {
+ Thread.sleep(5000L);
+ } while (result.getJobClient().get().getJobStatus().get() != RUNNING);
+ Thread.sleep(30000L);
+ LOG.info("Snapshot should end and start to read binlog.");
+
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'jacket','water resistent white wind breaker',0.2)");
+ statement.execute(
+ "INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'scooter','Big 2-wheel scooter ',5.18)");
+ statement.execute(
+ "UPDATE DB2INST1.PRODUCTS1 SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110");
+ statement.execute("UPDATE DB2INST1.PRODUCTS1 SET WEIGHT='5.17' WHERE ID=111");
+ statement.execute("DELETE FROM DB2INST1.PRODUCTS1 WHERE ID=111");
+ }
+
+ waitForSinkSize("sink", 7);
+
+ String[] expected =
+ new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
+
+ List actual = TestValuesTableFactory.getResults("sink");
+ assertThat(actual, containsInAnyOrder(expected));
+
+ result.getJobClient().get().cancel().get();
+ }
+
+ @Test
+ public void testMetadataColumns() throws Throwable {
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE debezium_source ("
+ + " DB_NAME STRING METADATA FROM 'database_name' VIRTUAL,"
+ + " SCHEMA_NAME STRING METADATA FROM 'schema_name' VIRTUAL,"
+ + " TABLE_NAME STRING METADATA FROM 'table_name' VIRTUAL,"
+ + " ID INT NOT NULL,"
+ + " NAME STRING,"
+ + " DESCRIPTION STRING,"
+ + " WEIGHT DECIMAL(10,3),"
+ + " PRIMARY KEY (ID) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'db2-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s'"
+ + ")",
+ DB2_CONTAINER.getHost(),
+ DB2_CONTAINER.getMappedPort(DB2_PORT),
+ DB2_CONTAINER.getUsername(),
+ DB2_CONTAINER.getPassword(),
+ DB2_CONTAINER.getDatabaseName(),
+ "DB2INST1",
+ "PRODUCTS2");
+ String sinkDDL =
+ "CREATE TABLE sink ("
+ + " database_name STRING,"
+ + " schema_name STRING,"
+ + " table_name STRING,"
+ + " id int,"
+ + " name STRING,"
+ + " description STRING,"
+ + " weight DECIMAL(10,3),"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false',"
+ + " 'sink-expected-messages-num' = '20'"
+ + ")";
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
+
+ waitForSnapshotStarted("sink");
+
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(
+ "UPDATE DB2INST1.PRODUCTS2 SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
+ statement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT='5.1' WHERE ID=107;");
+ statement.execute(
+ "INSERT INTO DB2INST1.PRODUCTS2 VALUES (110,'jacket','water resistent white wind breaker',0.2);");
+ statement.execute(
+ "INSERT INTO DB2INST1.PRODUCTS2 VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
+ statement.execute(
+ "UPDATE DB2INST1.PRODUCTS2 SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
+ statement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT='5.17' WHERE ID=111;");
+ statement.execute("DELETE FROM DB2INST1.PRODUCTS2 WHERE ID=111;");
+ }
+
+ waitForSinkSize("sink", 16);
+
+ List expected =
+ Arrays.asList(
+ "+I(testdb,DB2INST1,PRODUCTS2,101,scooter,Small 2-wheel scooter,3.140)",
+ "+I(testdb,DB2INST1,PRODUCTS2,102,car battery,12V car battery,8.100)",
+ "+I(testdb,DB2INST1,PRODUCTS2,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
+ "+I(testdb,DB2INST1,PRODUCTS2,104,hammer,12oz carpenter's hammer,0.750)",
+ "+I(testdb,DB2INST1,PRODUCTS2,105,hammer,14oz carpenter's hammer,0.875)",
+ "+I(testdb,DB2INST1,PRODUCTS2,106,hammer,16oz carpenter's hammer,1.000)",
+ "+I(testdb,DB2INST1,PRODUCTS2,107,rocks,box of assorted rocks,5.300)",
+ "+I(testdb,DB2INST1,PRODUCTS2,108,jacket,water resistent black wind breaker,0.100)",
+ "+I(testdb,DB2INST1,PRODUCTS2,109,spare tire,24 inch spare tire,22.200)",
+ "+U(testdb,DB2INST1,PRODUCTS2,106,hammer,18oz carpenter hammer,1.000)",
+ "+U(testdb,DB2INST1,PRODUCTS2,107,rocks,box of assorted rocks,5.100)",
+ "+I(testdb,DB2INST1,PRODUCTS2,110,jacket,water resistent white wind breaker,0.200)",
+ "+I(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.180)",
+ "+U(testdb,DB2INST1,PRODUCTS2,110,jacket,new water resistent white wind breaker,0.500)",
+ "+U(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.170)",
+ "-D(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.170)");
+
+ List actual = TestValuesTableFactory.getRawResults("sink");
+ Collections.sort(expected);
+ Collections.sort(actual);
+ assertEquals(expected, actual);
+ result.getJobClient().get().cancel().get();
+ }
+
+ private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
+ while (sinkSize(sinkName) == 0) {
+ Thread.sleep(1000L);
+ }
+ }
+
+ private static void waitForSinkSize(String sinkName, int expectedSize)
+ throws InterruptedException {
+ while (sinkSize(sinkName) < expectedSize) {
+ Thread.sleep(1000L);
+ }
+ }
+
+ private static int sinkSize(String sinkName) {
+ synchronized (TestValuesTableFactory.class) {
+ try {
+ return TestValuesTableFactory.getRawResults(sinkName).size();
+ } catch (IllegalArgumentException e) {
+ // job is not started yet
+ return 0;
+ }
+ }
+ }
+}
diff --git a/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
new file mode 100644
index 000000000..b9b80d6f8
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2022 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.connectors.db2.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.util.ExceptionUtils;
+
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import org.junit.Test;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
+import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Test for {@link Db2TableSource} created by {@link Db2TableSourceFactory}. */
+public class Db2TableSourceFactoryTest {
+
+ private static final ResolvedSchema SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("aaa", DataTypes.INT().notNull()),
+ Column.physical("bbb", DataTypes.STRING().notNull()),
+ Column.physical("ccc", DataTypes.DOUBLE()),
+ Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
+ Column.physical("eee", DataTypes.TIMESTAMP(3))),
+ new ArrayList<>(),
+ UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
+
+ private static final ResolvedSchema SCHEMA_WITH_METADATA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("aaa", DataTypes.INT().notNull()),
+ Column.physical("bbb", DataTypes.STRING().notNull()),
+ Column.physical("ccc", DataTypes.DOUBLE()),
+ Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
+ Column.physical("eee", DataTypes.TIMESTAMP(3)),
+ Column.metadata(
+ "database_name", DataTypes.STRING(), "database_name", true),
+ Column.metadata("table_name", DataTypes.STRING(), "table_name", true),
+ Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true),
+ Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true)),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
+
+ private static final String MY_LOCALHOST = "localhost";
+ private static final String MY_USERNAME = "flinkuser";
+ private static final String MY_PASSWORD = "flinkpw";
+ private static final String MY_DATABASE = "myDB";
+ private static final String MY_SCHEMA = "flinkuser";
+ private static final String MY_TABLE = "myTable";
+ private static final Properties PROPERTIES = new Properties();
+
+ @Test
+ public void testCommonProperties() {
+ Map properties = getAllOptions();
+
+ // validation for source
+ DynamicTableSource actualSource = createTableSource(properties, SCHEMA);
+ Db2TableSource expectedSource =
+ new Db2TableSource(
+ getPhysicalSchema(SCHEMA),
+ 50000,
+ MY_LOCALHOST,
+ MY_DATABASE,
+ MY_SCHEMA,
+ MY_TABLE,
+ MY_USERNAME,
+ MY_PASSWORD,
+ ZoneId.of("UTC"),
+ PROPERTIES,
+ StartupOptions.initial());
+ assertEquals(expectedSource, actualSource);
+ }
+
+ @Test
+ public void testOptionalProperties() {
+ Map options = getAllOptions();
+ options.put("port", "50000");
+ options.put("server-time-zone", "Asia/Shanghai");
+ options.put("debezium.snapshot.mode", "schema_only");
+
+ DynamicTableSource actualSource = createTableSource(options, SCHEMA);
+ Properties dbzProperties = new Properties();
+ dbzProperties.put("snapshot.mode", "schema_only");
+ Db2TableSource expectedSource =
+ new Db2TableSource(
+ getPhysicalSchema(SCHEMA),
+ 50000,
+ MY_LOCALHOST,
+ MY_DATABASE,
+ MY_SCHEMA,
+ MY_TABLE,
+ MY_USERNAME,
+ MY_PASSWORD,
+ ZoneId.of("Asia/Shanghai"),
+ dbzProperties,
+ StartupOptions.latest());
+ assertEquals(expectedSource, actualSource);
+ }
+
+ @Test
+ public void testValidation() {
+ // validate illegal port
+ try {
+ Map properties = getAllOptions();
+ properties.put("port", "123b");
+
+ createTableSource(properties, SCHEMA);
+ fail("exception expected");
+ } catch (Throwable t) {
+ assertTrue(
+ ExceptionUtils.findThrowableWithMessage(
+ t, "Could not parse value '123b' for key 'port'.")
+ .isPresent());
+ }
+
+ // validate missing required
+ Factory factory = new Db2TableSourceFactory();
+ for (ConfigOption> requiredOption : factory.requiredOptions()) {
+ Map properties = getAllOptions();
+ properties.remove(requiredOption.key());
+
+ try {
+ createTableSource(properties, SCHEMA);
+ fail("exception expected");
+ } catch (Throwable t) {
+ assertTrue(
+ ExceptionUtils.findThrowableWithMessage(
+ t,
+ "Missing required options are:\n\n" + requiredOption.key())
+ .isPresent());
+ }
+ }
+
+ // validate unsupported option
+ try {
+ Map properties = getAllOptions();
+ properties.put("unknown", "abc");
+
+ createTableSource(properties, SCHEMA);
+ fail("exception expected");
+ } catch (Throwable t) {
+ assertTrue(
+ ExceptionUtils.findThrowableWithMessage(t, "Unsupported options:\n\nunknown")
+ .isPresent());
+ }
+ }
+
+ @Test
+ public void testMetadataColumns() {
+ Map properties = getAllOptions();
+
+ // validation for source
+ DynamicTableSource actualSource = createTableSource(properties, SCHEMA_WITH_METADATA);
+ Db2TableSource db2TableSource = (Db2TableSource) actualSource;
+ db2TableSource.applyReadableMetadata(
+ Arrays.asList("op_ts", "database_name", "table_name", "schema_name"),
+ SCHEMA_WITH_METADATA.toSourceRowDataType());
+ actualSource = db2TableSource.copy();
+ Db2TableSource expectedSource =
+ new Db2TableSource(
+ SCHEMA_WITH_METADATA,
+ 50000,
+ MY_LOCALHOST,
+ MY_DATABASE,
+ MY_SCHEMA,
+ MY_TABLE,
+ MY_USERNAME,
+ MY_PASSWORD,
+ ZoneId.of("UTC"),
+ new Properties(),
+ StartupOptions.initial());
+ expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
+ expectedSource.metadataKeys =
+ Arrays.asList("op_ts", "database_name", "table_name", "schema_name");
+
+ assertEquals(expectedSource, actualSource);
+
+ ScanTableSource.ScanRuntimeProvider provider =
+ db2TableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ DebeziumSourceFunction debeziumSourceFunction =
+ (DebeziumSourceFunction)
+ ((SourceFunctionProvider) provider).createSourceFunction();
+ assertProducedTypeOfSourceFunction(debeziumSourceFunction, expectedSource.producedDataType);
+ }
+
+ private Map getAllOptions() {
+ Map options = new HashMap<>();
+ options.put("connector", "db2-cdc");
+ options.put("hostname", MY_LOCALHOST);
+ options.put("database-name", MY_DATABASE);
+ options.put("schema-name", MY_SCHEMA);
+ options.put("table-name", MY_TABLE);
+ options.put("username", MY_USERNAME);
+ options.put("password", MY_PASSWORD);
+ return options;
+ }
+
+ private static DynamicTableSource createTableSource(
+ Map options, ResolvedSchema schema) {
+ return FactoryUtil.createTableSource(
+ null,
+ ObjectIdentifier.of("default", "default", "t1"),
+ new ResolvedCatalogTable(
+ CatalogTable.of(
+ fromResolvedSchema(schema).toSchema(),
+ "mock source",
+ new ArrayList<>(),
+ options),
+ schema),
+ new Configuration(),
+ Db2TableSourceFactoryTest.class.getClassLoader(),
+ false);
+ }
+}
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile b/flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile
new file mode 100644
index 000000000..42a117d25
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile
@@ -0,0 +1,36 @@
+################################################################################
+# Copyright 2022 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+FROM ibmcom/db2:11.5.0.0a
+
+MAINTAINER Peter Urbanetz
+
+RUN mkdir -p /asncdctools/src
+
+ADD asncdc_UDF.sql /asncdctools/src
+ADD asncdcaddremove.sql /asncdctools/src
+ADD asncdctables.sql /asncdctools/src
+ADD dbsetup.sh /asncdctools/src
+ADD startup-agent.sql /asncdctools/src
+ADD startup-cdc-demo.sql /asncdctools/src
+ADD inventory.sql /asncdctools/src
+ADD column_type_test.sql /asncdctools/src
+ADD asncdc.c /asncdctools/src
+
+RUN chmod -R 777 /asncdctools
+
+RUN mkdir /var/custom
+ADD cdcsetup.sh /var/custom
+RUN chmod -R 777 /var/custom
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc.c b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc.c
new file mode 100644
index 000000000..3d27fe4cb
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc.c
@@ -0,0 +1,178 @@
+/*****************************************************************************
+* Copyright 2022 Ververica Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+******************************************************************************/
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+void SQL_API_FN asncdcservice(
+ SQLUDF_VARCHAR *asnCommand, /* input */
+ SQLUDF_VARCHAR *asnService,
+ SQLUDF_CLOB *fileData, /* output */
+ /* null indicators */
+ SQLUDF_NULLIND *asnCommand_ind, /* input */
+ SQLUDF_NULLIND *asnService_ind,
+ SQLUDF_NULLIND *fileData_ind,
+ SQLUDF_TRAIL_ARGS,
+ struct sqludf_dbinfo *dbinfo)
+{
+
+ int fd;
+ char tmpFileName[] = "/tmp/fileXXXXXX";
+ fd = mkstemp(tmpFileName);
+
+ int strcheck = 0;
+ char cmdstring[256];
+
+
+ char* szDb2path = getenv("HOME");
+
+
+
+ char str[20];
+ int len = 0;
+ char c;
+ char *buffer = NULL;
+ FILE *pidfile;
+
+ char dbname[129];
+ memset(dbname, '\0', 129);
+ strncpy(dbname, (char *)(dbinfo->dbname), dbinfo->dbnamelen);
+ dbname[dbinfo->dbnamelen] = '\0';
+
+ int pid;
+ if (strcmp(asnService, "asncdc") == 0)
+ {
+ strcheck = sprintf(cmdstring, "pgrep -fx \"%s/sqllib/bin/asncap capture_schema=%s capture_server=%s\" > %s", szDb2path, asnService, dbname, tmpFileName);
+ int callcheck;
+ callcheck = system(cmdstring);
+ pidfile = fopen(tmpFileName, "r");
+ while ((c = fgetc(pidfile)) != EOF)
+ {
+ if (c == '\n')
+ {
+ break;
+ }
+ len++;
+ }
+ buffer = (char *)malloc(sizeof(char) * len);
+ fseek(pidfile, 0, SEEK_SET);
+ fread(buffer, sizeof(char), len, pidfile);
+ fclose(pidfile);
+ pidfile = fopen(tmpFileName, "w");
+ if (strcmp(asnCommand, "start") == 0)
+ {
+ if (len == 0) // is not running
+ {
+ strcheck = sprintf(cmdstring, "%s/sqllib/bin/asncap capture_schema=%s capture_server=%s &", szDb2path, asnService, dbname);
+ fprintf(pidfile, "start --> %s \n", cmdstring);
+ callcheck = system(cmdstring);
+ }
+ else
+ {
+ fprintf(pidfile, "asncap is already running");
+ }
+ }
+ if ((strcmp(asnCommand, "prune") == 0) ||
+ (strcmp(asnCommand, "reinit") == 0) ||
+ (strcmp(asnCommand, "suspend") == 0) ||
+ (strcmp(asnCommand, "resume") == 0) ||
+ (strcmp(asnCommand, "status") == 0) ||
+ (strcmp(asnCommand, "stop") == 0))
+ {
+ if (len > 0)
+ {
+ //buffer[len] = '\0';
+ //strcheck = sprintf(cmdstring, "/bin/kill -SIGINT %s ", buffer);
+ //fprintf(pidfile, "stop --> %s", cmdstring);
+ //callcheck = system(cmdstring);
+ strcheck = sprintf(cmdstring, "%s/sqllib/bin/asnccmd capture_schema=%s capture_server=%s %s >> %s", szDb2path, asnService, dbname, asnCommand, tmpFileName);
+ //fprintf(pidfile, "%s --> %s \n", cmdstring, asnCommand);
+ callcheck = system(cmdstring);
+ }
+ else
+ {
+ fprintf(pidfile, "asncap is not running");
+ }
+ }
+
+ fclose(pidfile);
+ }
+ /* system(cmdstring); */
+
+ int rc = 0;
+ long fileSize = 0;
+ size_t readCnt = 0;
+ FILE *f = NULL;
+
+ f = fopen(tmpFileName, "r");
+ if (!f)
+ {
+ strcpy(SQLUDF_MSGTX, "Could not open file ");
+ strncat(SQLUDF_MSGTX, tmpFileName,
+ SQLUDF_MSGTEXT_LEN - strlen(SQLUDF_MSGTX) - 1);
+ strncpy(SQLUDF_STATE, "38100", SQLUDF_SQLSTATE_LEN);
+ return;
+ }
+
+ rc = fseek(f, 0, SEEK_END);
+ if (rc)
+ {
+ sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc);
+ strncpy(SQLUDF_STATE, "38101", SQLUDF_SQLSTATE_LEN);
+ return;
+ }
+
+ /* verify the file size */
+ fileSize = ftell(f);
+ if (fileSize > fileData->length)
+ {
+ strcpy(SQLUDF_MSGTX, "File too large");
+ strncpy(SQLUDF_STATE, "38102", SQLUDF_SQLSTATE_LEN);
+ return;
+ }
+
+ /* go to the beginning and read the entire file */
+ rc = fseek(f, 0, 0);
+ if (rc)
+ {
+ sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc);
+ strncpy(SQLUDF_STATE, "38103", SQLUDF_SQLSTATE_LEN);
+ return;
+ }
+
+ readCnt = fread(fileData->data, 1, fileSize, f);
+ if (readCnt != fileSize)
+ {
+ /* raise a warning that something weird is going on */
+ sprintf(SQLUDF_MSGTX, "Could not read entire file "
+ "(%d vs %d)",
+ readCnt, fileSize);
+ strncpy(SQLUDF_STATE, "01H10", SQLUDF_SQLSTATE_LEN);
+ *fileData_ind = -1;
+ }
+ else
+ {
+ fileData->length = readCnt;
+ *fileData_ind = 0;
+ }
+ // remove temorary file
+ rc = remove(tmpFileName);
+ //fclose(pFile);
+}
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc_UDF.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc_UDF.sql
new file mode 100644
index 000000000..2f83c549d
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc_UDF.sql
@@ -0,0 +1,30 @@
+-- Copyright 2022 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+DROP SPECIFIC FUNCTION ASNCDC.asncdcservice;
+
+CREATE FUNCTION ASNCDC.ASNCDCSERVICES(command VARCHAR(6), service VARCHAR(8))
+ RETURNS CLOB(100K)
+ SPECIFIC asncdcservice
+ EXTERNAL NAME 'asncdc!asncdcservice'
+ LANGUAGE C
+ PARAMETER STYLE SQL
+ DBINFO
+ DETERMINISTIC
+ NOT FENCED
+ RETURNS NULL ON NULL INPUT
+ NO SQL
+ NO EXTERNAL ACTION
+ NO SCRATCHPAD
+ ALLOW PARALLEL
+ NO FINAL CALL;
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/asncdcaddremove.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdcaddremove.sql
new file mode 100644
index 000000000..1ad349799
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdcaddremove.sql
@@ -0,0 +1,204 @@
+-- Copyright 2022 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- Define ASNCDC.REMOVETABLE() and ASNCDC.ADDTABLE()
+-- ASNCDC.ADDTABLE() puts a table in CDC mode, making the ASNCapture server collect changes for the table
+-- ASNCDC.REMOVETABLE() makes the ASNCapture server stop collecting changes for that table
+
+--#SET TERMINATOR @
+CREATE OR REPLACE PROCEDURE ASNCDC.REMOVETABLE(
+in tableschema VARCHAR(128),
+in tablename VARCHAR(128)
+)
+LANGUAGE SQL
+P1:
+BEGIN
+
+DECLARE stmtSQL VARCHAR(2048);
+
+DECLARE SQLCODE INT;
+DECLARE SQLSTATE CHAR(5);
+DECLARE RC_SQLCODE INT DEFAULT 0;
+DECLARE RC_SQLSTATE CHAR(5) DEFAULT '00000';
+
+DECLARE CONTINUE HANDLER FOR SQLEXCEPTION, SQLWARNING, NOT FOUND VALUES (SQLCODE, SQLSTATE) INTO RC_SQLCODE, RC_SQLSTATE;
+
+-- delete ASN.IBMSNAP_PRUNCTL entries / source
+SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_PRUNCNTL WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || '''';
+ EXECUTE IMMEDIATE stmtSQL;
+
+-- delete ASN.IBMSNAP_Register entries / source
+SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_REGISTER WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || '''';
+ EXECUTE IMMEDIATE stmtSQL;
+
+-- drop CD Table / source
+SET stmtSQL = 'DROP TABLE ASNCDC.CDC_' ||
+ tableschema || '_' || tablename ;
+ EXECUTE IMMEDIATE stmtSQL;
+
+-- delete ASN.IBMSNAP_SUBS_COLS entries /target
+SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_COLS WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || '''';
+ EXECUTE IMMEDIATE stmtSQL;
+
+-- delete ASN.IBMSNAP_SUSBS_MEMBER entries /target
+SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_MEMBR WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || '''';
+ EXECUTE IMMEDIATE stmtSQL;
+
+-- delete ASN.IBMQREP_COLVERSION
+SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_COLVERSION col WHERE EXISTS (SELECT * FROM ASNCDC.IBMQREP_TABVERSION tab WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || '''AND col.TABLEID1 = tab.TABLEID1 AND col.TABLEID2 = tab.TABLEID2';
+ EXECUTE IMMEDIATE stmtSQL;
+
+-- delete ASN.IBMQREP_TABVERSION
+SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_TABVERSION WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || '''';
+ EXECUTE IMMEDIATE stmtSQL;
+
+SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE NONE';
+EXECUTE IMMEDIATE stmtSQL;
+
+END P1@
+--#SET TERMINATOR ;
+
+--#SET TERMINATOR @
+CREATE OR REPLACE PROCEDURE ASNCDC.ADDTABLE(
+in tableschema VARCHAR(128),
+in tablename VARCHAR(128)
+)
+LANGUAGE SQL
+P1:
+BEGIN
+
+DECLARE SQLSTATE CHAR(5);
+
+DECLARE stmtSQL VARCHAR(2048);
+
+SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE CHANGES';
+EXECUTE IMMEDIATE stmtSQL;
+
+SET stmtSQL = 'CREATE TABLE ASNCDC.CDC_' ||
+ tableschema || '_' || tablename ||
+ ' AS ( SELECT ' ||
+ ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_COMMITSEQ, ' ||
+ ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_INTENTSEQ, ' ||
+ ' CAST ('''' AS CHAR(1)) ' ||
+ ' AS IBMSNAP_OPERATION, t.* FROM ' || tableschema || '.' || tablename || ' as t ) WITH NO DATA ORGANIZE BY ROW ';
+EXECUTE IMMEDIATE stmtSQL;
+
+SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' ||
+ tableschema || '_' || tablename ||
+ ' ALTER COLUMN IBMSNAP_COMMITSEQ SET NOT NULL';
+EXECUTE IMMEDIATE stmtSQL;
+
+SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' ||
+ tableschema || '_' || tablename ||
+ ' ALTER COLUMN IBMSNAP_INTENTSEQ SET NOT NULL';
+EXECUTE IMMEDIATE stmtSQL;
+
+SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' ||
+ tableschema || '_' || tablename ||
+ ' ALTER COLUMN IBMSNAP_OPERATION SET NOT NULL';
+EXECUTE IMMEDIATE stmtSQL;
+
+SET stmtSQL = 'CREATE UNIQUE INDEX ASNCDC.IXCDC_' ||
+ tableschema || '_' || tablename ||
+ ' ON ASNCDC.CDC_' ||
+ tableschema || '_' || tablename ||
+ ' ( IBMSNAP_COMMITSEQ ASC, IBMSNAP_INTENTSEQ ASC ) PCTFREE 0 MINPCTUSED 0';
+EXECUTE IMMEDIATE stmtSQL;
+
+SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' ||
+ tableschema || '_' || tablename ||
+ ' VOLATILE CARDINALITY';
+EXECUTE IMMEDIATE stmtSQL;
+
+SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_REGISTER (SOURCE_OWNER, SOURCE_TABLE, ' ||
+ 'SOURCE_VIEW_QUAL, GLOBAL_RECORD, SOURCE_STRUCTURE, SOURCE_CONDENSED, ' ||
+ 'SOURCE_COMPLETE, CD_OWNER, CD_TABLE, PHYS_CHANGE_OWNER, ' ||
+ 'PHYS_CHANGE_TABLE, CD_OLD_SYNCHPOINT, CD_NEW_SYNCHPOINT, ' ||
+ 'DISABLE_REFRESH, CCD_OWNER, CCD_TABLE, CCD_OLD_SYNCHPOINT, ' ||
+ 'SYNCHPOINT, SYNCHTIME, CCD_CONDENSED, CCD_COMPLETE, ARCH_LEVEL, ' ||
+ 'DESCRIPTION, BEFORE_IMG_PREFIX, CONFLICT_LEVEL, ' ||
+ 'CHG_UPD_TO_DEL_INS, CHGONLY, RECAPTURE, OPTION_FLAGS, ' ||
+ 'STOP_ON_ERROR, STATE, STATE_INFO ) VALUES( ' ||
+ '''' || tableschema || ''', ' ||
+ '''' || tablename || ''', ' ||
+ '0, ' ||
+ '''N'', ' ||
+ '1, ' ||
+ '''Y'', ' ||
+ '''Y'', ' ||
+ '''ASNCDC'', ' ||
+ '''CDC_' || tableschema || '_' || tablename || ''', ' ||
+ '''ASNCDC'', ' ||
+ '''CDC_' || tableschema || '_' || tablename || ''', ' ||
+ 'null, ' ||
+ 'null, ' ||
+ '0, ' ||
+ 'null, ' ||
+ 'null, ' ||
+ 'null, ' ||
+ 'null, ' ||
+ 'null, ' ||
+ 'null, ' ||
+ 'null, ' ||
+ '''0801'', ' ||
+ 'null, ' ||
+ 'null, ' ||
+ '''0'', ' ||
+ '''Y'', ' ||
+ '''N'', ' ||
+ '''Y'', ' ||
+ '''NNNN'', ' ||
+ '''Y'', ' ||
+ '''A'',' ||
+ 'null ) ';
+EXECUTE IMMEDIATE stmtSQL;
+
+SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' ||
+ 'TARGET_SERVER, ' ||
+ 'TARGET_OWNER, ' ||
+ 'TARGET_TABLE, ' ||
+ 'SYNCHTIME, ' ||
+ 'SYNCHPOINT, ' ||
+ 'SOURCE_OWNER, ' ||
+ 'SOURCE_TABLE, ' ||
+ 'SOURCE_VIEW_QUAL, ' ||
+ 'APPLY_QUAL, ' ||
+ 'SET_NAME, ' ||
+ 'CNTL_SERVER , ' ||
+ 'TARGET_STRUCTURE , ' ||
+ 'CNTL_ALIAS , ' ||
+ 'PHYS_CHANGE_OWNER , ' ||
+ 'PHYS_CHANGE_TABLE , ' ||
+ 'MAP_ID ' ||
+ ') VALUES ( ' ||
+ '''KAFKA'', ' ||
+ '''' || tableschema || ''', ' ||
+ '''' || tablename || ''', ' ||
+ 'NULL, ' ||
+ 'NULL, ' ||
+ '''' || tableschema || ''', ' ||
+ '''' || tablename || ''', ' ||
+ '0, ' ||
+ '''KAFKAQUAL'', ' ||
+ '''SET001'', ' ||
+ ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' ||
+ '8, ' ||
+ ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' ||
+ '''ASNCDC'', ' ||
+ '''CDC_' || tableschema || '_' || tablename || ''', ' ||
+ ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' ||
+ ' )';
+EXECUTE IMMEDIATE stmtSQL;
+
+END P1@
+--#SET TERMINATOR ;
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/asncdctables.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdctables.sql
new file mode 100644
index 000000000..5efb795bd
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdctables.sql
@@ -0,0 +1,492 @@
+-- Copyright 2022 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- 1021 db2 LEVEL Version 10.2.0 --> 11.5.0 1150
+
+CREATE TABLE ASNCDC.IBMQREP_COLVERSION(
+LSN VARCHAR( 16) FOR BIT DATA NOT NULL,
+TABLEID1 SMALLINT NOT NULL,
+TABLEID2 SMALLINT NOT NULL,
+POSITION SMALLINT NOT NULL,
+NAME VARCHAR(128) NOT NULL,
+TYPE SMALLINT NOT NULL,
+LENGTH INTEGER NOT NULL,
+NULLS CHAR( 1) NOT NULL,
+DEFAULT VARCHAR(1536),
+CODEPAGE INTEGER,
+SCALE INTEGER,
+VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT )
+ ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMQREP_COLVERSIOX
+ON ASNCDC.IBMQREP_COLVERSION(
+LSN ASC,
+TABLEID1 ASC,
+TABLEID2 ASC,
+POSITION ASC);
+
+CREATE INDEX ASNCDC.IX2COLVERSION
+ON ASNCDC.IBMQREP_COLVERSION(
+TABLEID1 ASC,
+TABLEID2 ASC);
+
+CREATE TABLE ASNCDC.IBMQREP_TABVERSION(
+LSN VARCHAR( 16) FOR BIT DATA NOT NULL,
+TABLEID1 SMALLINT NOT NULL,
+TABLEID2 SMALLINT NOT NULL,
+VERSION INTEGER NOT NULL,
+SOURCE_OWNER VARCHAR(128) NOT NULL,
+SOURCE_NAME VARCHAR(128) NOT NULL,
+VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT )
+ ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMQREP_TABVERSIOX
+ON ASNCDC.IBMQREP_TABVERSION(
+LSN ASC,
+TABLEID1 ASC,
+TABLEID2 ASC,
+VERSION ASC);
+
+CREATE INDEX ASNCDC.IX2TABVERSION
+ON ASNCDC.IBMQREP_TABVERSION(
+TABLEID1 ASC,
+TABLEID2 ASC);
+
+CREATE INDEX ASNCDC.IX3TABVERSION
+ON ASNCDC.IBMQREP_TABVERSION(
+SOURCE_OWNER ASC,
+SOURCE_NAME ASC);
+
+CREATE TABLE ASNCDC.IBMSNAP_APPLEVEL(
+ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021')
+ORGANIZE BY ROW;
+
+INSERT INTO ASNCDC.IBMSNAP_APPLEVEL(ARCH_LEVEL) VALUES (
+'1021');
+
+CREATE TABLE ASNCDC.IBMSNAP_CAPMON(
+MONITOR_TIME TIMESTAMP NOT NULL,
+RESTART_TIME TIMESTAMP NOT NULL,
+CURRENT_MEMORY INT NOT NULL,
+CD_ROWS_INSERTED INT NOT NULL,
+RECAP_ROWS_SKIPPED INT NOT NULL,
+TRIGR_ROWS_SKIPPED INT NOT NULL,
+CHG_ROWS_SKIPPED INT NOT NULL,
+TRANS_PROCESSED INT NOT NULL,
+TRANS_SPILLED INT NOT NULL,
+MAX_TRANS_SIZE INT NOT NULL,
+LOCKING_RETRIES INT NOT NULL,
+JRN_LIB CHAR( 10),
+JRN_NAME CHAR( 10),
+LOGREADLIMIT INT NOT NULL,
+CAPTURE_IDLE INT NOT NULL,
+SYNCHTIME TIMESTAMP NOT NULL,
+CURRENT_LOG_TIME TIMESTAMP NOT NULL WITH DEFAULT ,
+LAST_EOL_TIME TIMESTAMP,
+RESTART_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT ,
+CURRENT_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT ,
+RESTART_MAXCMTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT ,
+LOGREAD_API_TIME INT,
+NUM_LOGREAD_CALLS INT,
+NUM_END_OF_LOGS INT,
+LOGRDR_SLEEPTIME INT,
+NUM_LOGREAD_F_CALLS INT,
+TRANS_QUEUED INT,
+NUM_WARNTXS INT,
+NUM_WARNLOGAPI INT)
+ ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPMONX
+ON ASNCDC.IBMSNAP_CAPMON(
+MONITOR_TIME ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_CAPMON VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_CAPPARMS(
+RETENTION_LIMIT INT,
+LAG_LIMIT INT,
+COMMIT_INTERVAL INT,
+PRUNE_INTERVAL INT,
+TRACE_LIMIT INT,
+MONITOR_LIMIT INT,
+MONITOR_INTERVAL INT,
+MEMORY_LIMIT SMALLINT,
+REMOTE_SRC_SERVER CHAR( 18),
+AUTOPRUNE CHAR( 1),
+TERM CHAR( 1),
+AUTOSTOP CHAR( 1),
+LOGREUSE CHAR( 1),
+LOGSTDOUT CHAR( 1),
+SLEEP_INTERVAL SMALLINT,
+CAPTURE_PATH VARCHAR(1040),
+STARTMODE VARCHAR( 10),
+LOGRDBUFSZ INT NOT NULL WITH DEFAULT 256,
+ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021',
+COMPATIBILITY CHAR( 4) NOT NULL WITH DEFAULT '1021')
+ ORGANIZE BY ROW;
+
+INSERT INTO ASNCDC.IBMSNAP_CAPPARMS(
+RETENTION_LIMIT,
+LAG_LIMIT,
+COMMIT_INTERVAL,
+PRUNE_INTERVAL,
+TRACE_LIMIT,
+MONITOR_LIMIT,
+MONITOR_INTERVAL,
+MEMORY_LIMIT,
+SLEEP_INTERVAL,
+AUTOPRUNE,
+TERM,
+AUTOSTOP,
+LOGREUSE,
+LOGSTDOUT,
+CAPTURE_PATH,
+STARTMODE,
+COMPATIBILITY)
+VALUES (
+10080,
+10080,
+30,
+300,
+10080,
+10080,
+300,
+32,
+5,
+'Y',
+'Y',
+'N',
+'N',
+'N',
+NULL,
+'WARMSI',
+'1021'
+);
+
+CREATE TABLE ASNCDC.IBMSNAP_CAPSCHEMAS (
+ CAP_SCHEMA_NAME VARCHAR(128 OCTETS) NOT NULL
+ )
+ ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPSCHEMASX
+ ON ASNCDC.IBMSNAP_CAPSCHEMAS
+ (CAP_SCHEMA_NAME ASC);
+
+INSERT INTO ASNCDC.IBMSNAP_CAPSCHEMAS(CAP_SCHEMA_NAME) VALUES (
+'ASNCDC');
+
+CREATE TABLE ASNCDC.IBMSNAP_CAPTRACE(
+OPERATION CHAR( 8) NOT NULL,
+TRACE_TIME TIMESTAMP NOT NULL,
+DESCRIPTION VARCHAR(1024) NOT NULL)
+ ORGANIZE BY ROW;
+
+CREATE INDEX ASNCDC.IBMSNAP_CAPTRACEX
+ON ASNCDC.IBMSNAP_CAPTRACE(
+TRACE_TIME ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_CAPTRACE VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_PRUNCNTL(
+TARGET_SERVER CHAR(18) NOT NULL,
+TARGET_OWNER VARCHAR(128) NOT NULL,
+TARGET_TABLE VARCHAR(128) NOT NULL,
+SYNCHTIME TIMESTAMP,
+SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
+SOURCE_OWNER VARCHAR(128) NOT NULL,
+SOURCE_TABLE VARCHAR(128) NOT NULL,
+SOURCE_VIEW_QUAL SMALLINT NOT NULL,
+APPLY_QUAL CHAR( 18) NOT NULL,
+SET_NAME CHAR( 18) NOT NULL,
+CNTL_SERVER CHAR( 18) NOT NULL,
+TARGET_STRUCTURE SMALLINT NOT NULL,
+CNTL_ALIAS CHAR( 8),
+PHYS_CHANGE_OWNER VARCHAR(128),
+PHYS_CHANGE_TABLE VARCHAR(128),
+MAP_ID VARCHAR(10) NOT NULL)
+ ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX
+ON ASNCDC.IBMSNAP_PRUNCNTL(
+SOURCE_OWNER ASC,
+SOURCE_TABLE ASC,
+SOURCE_VIEW_QUAL ASC,
+APPLY_QUAL ASC,
+SET_NAME ASC,
+TARGET_SERVER ASC,
+TARGET_TABLE ASC,
+TARGET_OWNER ASC);
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX1
+ON ASNCDC.IBMSNAP_PRUNCNTL(
+MAP_ID ASC);
+
+CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX2
+ON ASNCDC.IBMSNAP_PRUNCNTL(
+PHYS_CHANGE_OWNER ASC,
+PHYS_CHANGE_TABLE ASC);
+
+CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX3
+ON ASNCDC.IBMSNAP_PRUNCNTL(
+APPLY_QUAL ASC,
+SET_NAME ASC,
+TARGET_SERVER ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_PRUNCNTL VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_PRUNE_LOCK(
+DUMMY CHAR( 1))
+ ORGANIZE BY ROW;
+
+CREATE TABLE ASNCDC.IBMSNAP_PRUNE_SET(
+TARGET_SERVER CHAR( 18) NOT NULL,
+APPLY_QUAL CHAR( 18) NOT NULL,
+SET_NAME CHAR( 18) NOT NULL,
+SYNCHTIME TIMESTAMP,
+SYNCHPOINT VARCHAR( 16) FOR BIT DATA NOT NULL)
+ ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNE_SETX
+ON ASNCDC.IBMSNAP_PRUNE_SET(
+TARGET_SERVER ASC,
+APPLY_QUAL ASC,
+SET_NAME ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_PRUNE_SET VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_REGISTER(
+SOURCE_OWNER VARCHAR(128) NOT NULL,
+SOURCE_TABLE VARCHAR(128) NOT NULL,
+SOURCE_VIEW_QUAL SMALLINT NOT NULL,
+GLOBAL_RECORD CHAR( 1) NOT NULL,
+SOURCE_STRUCTURE SMALLINT NOT NULL,
+SOURCE_CONDENSED CHAR( 1) NOT NULL,
+SOURCE_COMPLETE CHAR( 1) NOT NULL,
+CD_OWNER VARCHAR(128),
+CD_TABLE VARCHAR(128),
+PHYS_CHANGE_OWNER VARCHAR(128),
+PHYS_CHANGE_TABLE VARCHAR(128),
+CD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
+CD_NEW_SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
+DISABLE_REFRESH SMALLINT NOT NULL,
+CCD_OWNER VARCHAR(128),
+CCD_TABLE VARCHAR(128),
+CCD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
+SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
+SYNCHTIME TIMESTAMP,
+CCD_CONDENSED CHAR( 1),
+CCD_COMPLETE CHAR( 1),
+ARCH_LEVEL CHAR( 4) NOT NULL,
+DESCRIPTION CHAR(254),
+BEFORE_IMG_PREFIX VARCHAR( 4),
+CONFLICT_LEVEL CHAR( 1),
+CHG_UPD_TO_DEL_INS CHAR( 1),
+CHGONLY CHAR( 1),
+RECAPTURE CHAR( 1),
+OPTION_FLAGS CHAR( 4) NOT NULL,
+STOP_ON_ERROR CHAR( 1) WITH DEFAULT 'Y',
+STATE CHAR( 1) WITH DEFAULT 'I',
+STATE_INFO CHAR( 8))
+ ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_REGISTERX
+ON ASNCDC.IBMSNAP_REGISTER(
+SOURCE_OWNER ASC,
+SOURCE_TABLE ASC,
+SOURCE_VIEW_QUAL ASC);
+
+CREATE INDEX ASNCDC.IBMSNAP_REGISTERX1
+ON ASNCDC.IBMSNAP_REGISTER(
+PHYS_CHANGE_OWNER ASC,
+PHYS_CHANGE_TABLE ASC);
+
+CREATE INDEX ASNCDC.IBMSNAP_REGISTERX2
+ON ASNCDC.IBMSNAP_REGISTER(
+GLOBAL_RECORD ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_REGISTER VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_RESTART(
+MAX_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL,
+MAX_COMMIT_TIME TIMESTAMP NOT NULL,
+MIN_INFLIGHTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL,
+CURR_COMMIT_TIME TIMESTAMP NOT NULL,
+CAPTURE_FIRST_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL)
+ ORGANIZE BY ROW;
+
+CREATE TABLE ASNCDC.IBMSNAP_SIGNAL(
+SIGNAL_TIME TIMESTAMP NOT NULL WITH DEFAULT ,
+SIGNAL_TYPE VARCHAR( 30) NOT NULL,
+SIGNAL_SUBTYPE VARCHAR( 30),
+SIGNAL_INPUT_IN VARCHAR(500),
+SIGNAL_STATE CHAR( 1) NOT NULL,
+SIGNAL_LSN VARCHAR( 16) FOR BIT DATA)
+DATA CAPTURE CHANGES
+ ORGANIZE BY ROW;
+
+CREATE INDEX ASNCDC.IBMSNAP_SIGNALX
+ON ASNCDC.IBMSNAP_SIGNAL(
+SIGNAL_TIME ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_SIGNAL VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_SUBS_COLS(
+APPLY_QUAL CHAR( 18) NOT NULL,
+SET_NAME CHAR( 18) NOT NULL,
+WHOS_ON_FIRST CHAR( 1) NOT NULL,
+TARGET_OWNER VARCHAR(128) NOT NULL,
+TARGET_TABLE VARCHAR(128) NOT NULL,
+COL_TYPE CHAR( 1) NOT NULL,
+TARGET_NAME VARCHAR(128) NOT NULL,
+IS_KEY CHAR( 1) NOT NULL,
+COLNO SMALLINT NOT NULL,
+EXPRESSION VARCHAR(1024) NOT NULL)
+ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_COLSX
+ON ASNCDC.IBMSNAP_SUBS_COLS(
+APPLY_QUAL ASC,
+SET_NAME ASC,
+WHOS_ON_FIRST ASC,
+TARGET_OWNER ASC,
+TARGET_TABLE ASC,
+TARGET_NAME ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_SUBS_COLS VOLATILE CARDINALITY;
+
+--CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_EVENTX
+--ON ASNCDC.IBMSNAP_SUBS_EVENT(
+--EVENT_NAME ASC,
+--EVENT_TIME ASC);
+
+
+--ALTER TABLE ASNCDC.IBMSNAP_SUBS_EVENT VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_SUBS_MEMBR(
+APPLY_QUAL CHAR( 18) NOT NULL,
+SET_NAME CHAR( 18) NOT NULL,
+WHOS_ON_FIRST CHAR( 1) NOT NULL,
+SOURCE_OWNER VARCHAR(128) NOT NULL,
+SOURCE_TABLE VARCHAR(128) NOT NULL,
+SOURCE_VIEW_QUAL SMALLINT NOT NULL,
+TARGET_OWNER VARCHAR(128) NOT NULL,
+TARGET_TABLE VARCHAR(128) NOT NULL,
+TARGET_CONDENSED CHAR( 1) NOT NULL,
+TARGET_COMPLETE CHAR( 1) NOT NULL,
+TARGET_STRUCTURE SMALLINT NOT NULL,
+PREDICATES VARCHAR(1024),
+MEMBER_STATE CHAR( 1),
+TARGET_KEY_CHG CHAR( 1) NOT NULL,
+UOW_CD_PREDICATES VARCHAR(1024),
+JOIN_UOW_CD CHAR( 1),
+LOADX_TYPE SMALLINT,
+LOADX_SRC_N_OWNER VARCHAR( 128),
+LOADX_SRC_N_TABLE VARCHAR(128))
+ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_MEMBRX
+ON ASNCDC.IBMSNAP_SUBS_MEMBR(
+APPLY_QUAL ASC,
+SET_NAME ASC,
+WHOS_ON_FIRST ASC,
+SOURCE_OWNER ASC,
+SOURCE_TABLE ASC,
+SOURCE_VIEW_QUAL ASC,
+TARGET_OWNER ASC,
+TARGET_TABLE ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_SUBS_MEMBR VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_SUBS_SET(
+APPLY_QUAL CHAR( 18) NOT NULL,
+SET_NAME CHAR( 18) NOT NULL,
+SET_TYPE CHAR( 1) NOT NULL,
+WHOS_ON_FIRST CHAR( 1) NOT NULL,
+ACTIVATE SMALLINT NOT NULL,
+SOURCE_SERVER CHAR( 18) NOT NULL,
+SOURCE_ALIAS CHAR( 8),
+TARGET_SERVER CHAR( 18) NOT NULL,
+TARGET_ALIAS CHAR( 8),
+STATUS SMALLINT NOT NULL,
+LASTRUN TIMESTAMP NOT NULL,
+REFRESH_TYPE CHAR( 1) NOT NULL,
+SLEEP_MINUTES INT,
+EVENT_NAME CHAR( 18),
+LASTSUCCESS TIMESTAMP,
+SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
+SYNCHTIME TIMESTAMP,
+CAPTURE_SCHEMA VARCHAR(128) NOT NULL,
+TGT_CAPTURE_SCHEMA VARCHAR(128),
+FEDERATED_SRC_SRVR VARCHAR( 18),
+FEDERATED_TGT_SRVR VARCHAR( 18),
+JRN_LIB CHAR( 10),
+JRN_NAME CHAR( 10),
+OPTION_FLAGS CHAR( 4) NOT NULL,
+COMMIT_COUNT SMALLINT,
+MAX_SYNCH_MINUTES SMALLINT,
+AUX_STMTS SMALLINT NOT NULL,
+ARCH_LEVEL CHAR( 4) NOT NULL)
+ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_SETX
+ON ASNCDC.IBMSNAP_SUBS_SET(
+APPLY_QUAL ASC,
+SET_NAME ASC,
+WHOS_ON_FIRST ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_SUBS_SET VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_SUBS_STMTS(
+APPLY_QUAL CHAR( 18) NOT NULL,
+SET_NAME CHAR( 18) NOT NULL,
+WHOS_ON_FIRST CHAR( 1) NOT NULL,
+BEFORE_OR_AFTER CHAR( 1) NOT NULL,
+STMT_NUMBER SMALLINT NOT NULL,
+EI_OR_CALL CHAR( 1) NOT NULL,
+SQL_STMT VARCHAR(1024),
+ACCEPT_SQLSTATES VARCHAR( 50))
+ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_STMTSX
+ON ASNCDC.IBMSNAP_SUBS_STMTS(
+APPLY_QUAL ASC,
+SET_NAME ASC,
+WHOS_ON_FIRST ASC,
+BEFORE_OR_AFTER ASC,
+STMT_NUMBER ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_SUBS_STMTS VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_UOW(
+IBMSNAP_UOWID CHAR( 10) FOR BIT DATA NOT NULL,
+IBMSNAP_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL,
+IBMSNAP_LOGMARKER TIMESTAMP NOT NULL,
+IBMSNAP_AUTHTKN VARCHAR(30) NOT NULL,
+IBMSNAP_AUTHID VARCHAR(128) NOT NULL,
+IBMSNAP_REJ_CODE CHAR( 1) NOT NULL WITH DEFAULT ,
+IBMSNAP_APPLY_QUAL CHAR( 18) NOT NULL WITH DEFAULT )
+ ORGANIZE BY ROW;
+
+CREATE UNIQUE INDEX ASNCDC.IBMSNAP_UOWX
+ON ASNCDC.IBMSNAP_UOW(
+IBMSNAP_COMMITSEQ ASC,
+IBMSNAP_LOGMARKER ASC);
+
+ALTER TABLE ASNCDC.IBMSNAP_UOW VOLATILE CARDINALITY;
+
+CREATE TABLE ASNCDC.IBMSNAP_CAPENQ (
+ LOCK_NAME CHAR(9 OCTETS)
+ )
+ ORGANIZE BY ROW
+ DATA CAPTURE NONE
+ COMPRESS NO;
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/cdcsetup.sh b/flink-connector-db2-cdc/src/test/resources/db2_server/cdcsetup.sh
new file mode 100644
index 000000000..02e00fafc
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/cdcsetup.sh
@@ -0,0 +1,33 @@
+#/bin/bash
+################################################################################
+# Copyright 2022 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [ ! -f /asncdctools/src/asncdc.nlk ]; then
+rc=1
+echo "waiting for db2inst1 exists ."
+while [ "$rc" -ne 0 ]
+do
+ sleep 5
+ id db2inst1
+ rc=$?
+ echo '.'
+done
+
+su -c "/asncdctools/src/dbsetup.sh $DBNAME" - db2inst1
+fi
+touch /asncdctools/src/asncdc.nlk
+
+echo "The asncdc program enable finished"
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/column_type_test.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/column_type_test.sql
new file mode 100644
index 000000000..00e3a29a2
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/column_type_test.sql
@@ -0,0 +1,42 @@
+-- Copyright 2022 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: column_type_test
+-- ----------------------------------------------------------------------------------------------------------------
+
+CREATE TABLE DB2INST1.FULL_TYPES (
+ ID INTEGER NOT NULL,
+ SMALL_C SMALLINT,
+ INT_C INTEGER,
+ BIG_C BIGINT,
+ REAL_C REAL,
+ DOUBLE_C DOUBLE,
+ NUMERIC_C NUMERIC(10, 5),
+ DECIMAL_C DECIMAL(10, 1),
+ VARCHAR_C VARCHAR(200),
+ CHAR_C CHAR,
+ CHARACTER_C CHAR(3),
+ TIMESTAMP_C TIMESTAMP,
+ DATE_C DATE,
+ TIME_C TIME,
+ DEFAULT_NUMERIC_C NUMERIC,
+ TIMESTAMP_PRECISION_C TIMESTAMP(9),
+ PRIMARY KEY (ID)
+);
+
+
+INSERT INTO DB2INST1.FULL_TYPES VALUES (
+ 1, 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443,
+ 'Hello World', 'a', 'abc', '2020-07-17 18:00:22.123', '2020-07-17', '18:00:22', 500,
+ '2020-07-17 18:00:22.123456789');
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/dbsetup.sh b/flink-connector-db2-cdc/src/test/resources/db2_server/dbsetup.sh
new file mode 100644
index 000000000..5cd0c9222
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/dbsetup.sh
@@ -0,0 +1,70 @@
+#/bin/bash
+################################################################################
+# Copyright 2022 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+echo "Compile ASN tool ..."
+cd /asncdctools/src
+/opt/ibm/db2/V11.5/samples/c/bldrtn asncdc
+
+DBNAME=$1
+DB2DIR=/opt/ibm/db2/V11.5
+rc=1
+echo "Waiting for DB2 start ( $DBNAME ) ."
+while [ "$rc" -ne 0 ]
+do
+ sleep 5
+ db2 connect to $DBNAME
+ rc=$?
+ echo '.'
+done
+
+# enable metacatalog read via JDBC
+cd $HOME/sqllib/bnd
+db2 bind db2schema.bnd blocking all grant public sqlerror continue
+
+# do a backup and restart the db
+db2 backup db $DBNAME to /dev/null
+db2 restart db $DBNAME
+
+db2 connect to $DBNAME
+
+cp /asncdctools/src/asncdc /database/config/db2inst1/sqllib/function
+chmod 777 /database/config/db2inst1/sqllib/function
+
+# add UDF / start stop asncap
+db2 -tvmf /asncdctools/src/asncdc_UDF.sql
+
+# create asntables
+db2 -tvmf /asncdctools/src/asncdctables.sql
+
+# add UDF / add remove asntables
+
+db2 -tvmf /asncdctools/src/asncdcaddremove.sql
+
+
+
+
+# create sample table and data
+db2 -tvmf /asncdctools/src/inventory.sql
+db2 -tvmf /asncdctools/src/column_type_test.sql
+db2 -tvmf /asncdctools/src/startup-agent.sql
+sleep 10
+db2 -tvmf /asncdctools/src/startup-cdc-demo.sql
+
+
+
+
+echo "db2 setup done"
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/inventory.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/inventory.sql
new file mode 100644
index 000000000..2adc6e914
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/inventory.sql
@@ -0,0 +1,70 @@
+-- Copyright 2022 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- Create and populate our test products table using a single insert with many rows
+CREATE TABLE DB2INST1.PRODUCTS (
+ ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY
+ (START WITH 101, INCREMENT BY 1) PRIMARY KEY,
+ NAME VARCHAR(255) NOT NULL,
+ DESCRIPTION VARCHAR(512),
+ WEIGHT FLOAT
+);
+
+CREATE TABLE DB2INST1.PRODUCTS1 (
+ ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY
+ (START WITH 101, INCREMENT BY 1) PRIMARY KEY,
+ NAME VARCHAR(255) NOT NULL,
+ DESCRIPTION VARCHAR(512),
+ WEIGHT FLOAT
+);
+
+CREATE TABLE DB2INST1.PRODUCTS2 (
+ ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY
+ (START WITH 101, INCREMENT BY 1) PRIMARY KEY,
+ NAME VARCHAR(255) NOT NULL,
+ DESCRIPTION VARCHAR(512),
+ WEIGHT FLOAT
+);
+
+INSERT INTO DB2INST1.PRODUCTS(NAME,DESCRIPTION,WEIGHT)
+VALUES ('scooter','Small 2-wheel scooter',3.14),
+ ('car battery','12V car battery',8.1),
+ ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
+ ('hammer','12oz carpenter''s hammer',0.75),
+ ('hammer','14oz carpenter''s hammer',0.875),
+ ('hammer','16oz carpenter''s hammer',1.0),
+ ('rocks','box of assorted rocks',5.3),
+ ('jacket','water resistent black wind breaker',0.1),
+ ('spare tire','24 inch spare tire',22.2);
+
+INSERT INTO DB2INST1.PRODUCTS1(NAME,DESCRIPTION,WEIGHT)
+VALUES ('scooter','Small 2-wheel scooter',3.14),
+ ('car battery','12V car battery',8.1),
+ ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
+ ('hammer','12oz carpenter''s hammer',0.75),
+ ('hammer','14oz carpenter''s hammer',0.875),
+ ('hammer','16oz carpenter''s hammer',1.0),
+ ('rocks','box of assorted rocks',5.3),
+ ('jacket','water resistent black wind breaker',0.1),
+ ('spare tire','24 inch spare tire',22.2);
+
+INSERT INTO DB2INST1.PRODUCTS2(NAME,DESCRIPTION,WEIGHT)
+VALUES ('scooter','Small 2-wheel scooter',3.14),
+ ('car battery','12V car battery',8.1),
+ ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
+ ('hammer','12oz carpenter''s hammer',0.75),
+ ('hammer','14oz carpenter''s hammer',0.875),
+ ('hammer','16oz carpenter''s hammer',1.0),
+ ('rocks','box of assorted rocks',5.3),
+ ('jacket','water resistent black wind breaker',0.1),
+ ('spare tire','24 inch spare tire',22.2);
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/startup-agent.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/startup-agent.sql
new file mode 100644
index 000000000..230ca11ab
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/startup-agent.sql
@@ -0,0 +1,14 @@
+-- Copyright 2022 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');
diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/startup-cdc-demo.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/startup-cdc-demo.sql
new file mode 100644
index 000000000..3d502c179
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/db2_server/startup-cdc-demo.sql
@@ -0,0 +1,21 @@
+-- Copyright 2022 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+VALUES ASNCDC.ASNCDCSERVICES('status','asncdc');
+
+CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS');
+CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS1');
+CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS2');
+CALL ASNCDC.ADDTABLE('DB2INST1', 'FULL_TYPES');
+
+VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');
diff --git a/flink-connector-db2-cdc/src/test/resources/log4j2-test.properties b/flink-connector-db2-cdc/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..9df04b09f
--- /dev/null
+++ b/flink-connector-db2-cdc/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+################################################################################
+# Copyright 2022 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_OUT
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/pom.xml b/pom.xml
index 77d46bf48..a7af197af 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,6 +41,7 @@ under the License.
flink-connector-oceanbase-cdc
flink-connector-sqlserver-cdc
flink-connector-tidb-cdc
+ flink-connector-db2-cdc
flink-sql-connector-mysql-cdc
flink-sql-connector-postgres-cdc
flink-sql-connector-mongodb-cdc
@@ -267,6 +268,7 @@ under the License.
**/*.txt
flink-connector-mysql-cdc/src/test/resources/file/*.json
+ flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile