From 5d005809e9db7d6d327be9e258ec9b6d874023fa Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Fri, 28 Oct 2022 16:39:33 +0800 Subject: [PATCH] [db2] Introduce Db2 cdc connector (#450) --- flink-connector-db2-cdc/pom.xml | 134 ++++ .../cdc/connectors/db2/Db2Source.java | 134 ++++ .../db2/table/Db2ReadableMetaData.java | 122 ++++ .../connectors/db2/table/Db2TableSource.java | 225 +++++++ .../db2/table/Db2TableSourceFactory.java | 179 ++++++ .../cdc/connectors/db2/table/StartupMode.java | 27 + .../connectors/db2/table/StartupOptions.java | 70 +++ .../org.apache.flink.table.factories.Factory | 15 + .../cdc/connectors/db2/Db2SourceTest.java | 584 ++++++++++++++++++ .../cdc/connectors/db2/Db2TestBase.java | 106 ++++ .../db2/table/Db2ConnectorITCase.java | 448 ++++++++++++++ .../db2/table/Db2TableSourceFactoryTest.java | 255 ++++++++ .../src/test/resources/db2_server/Dockerfile | 36 ++ .../src/test/resources/db2_server/asncdc.c | 178 ++++++ .../test/resources/db2_server/asncdc_UDF.sql | 30 + .../resources/db2_server/asncdcaddremove.sql | 204 ++++++ .../resources/db2_server/asncdctables.sql | 492 +++++++++++++++ .../src/test/resources/db2_server/cdcsetup.sh | 33 + .../resources/db2_server/column_type_test.sql | 42 ++ .../src/test/resources/db2_server/dbsetup.sh | 70 +++ .../test/resources/db2_server/inventory.sql | 70 +++ .../resources/db2_server/startup-agent.sql | 14 + .../resources/db2_server/startup-cdc-demo.sql | 21 + .../src/test/resources/log4j2-test.properties | 26 + pom.xml | 2 + 25 files changed, 3517 insertions(+) create mode 100644 flink-connector-db2-cdc/pom.xml create mode 100644 flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/Db2Source.java create mode 100644 flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2ReadableMetaData.java create mode 100644 flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSource.java create mode 100644 flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactory.java create mode 100644 flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/StartupMode.java create mode 100644 flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/StartupOptions.java create mode 100644 flink-connector-db2-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2SourceTest.java create mode 100644 flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2TestBase.java create mode 100644 flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2ConnectorITCase.java create mode 100644 flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/asncdc.c create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/asncdc_UDF.sql create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/asncdcaddremove.sql create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/asncdctables.sql create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/cdcsetup.sh create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/column_type_test.sql create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/dbsetup.sh create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/inventory.sql create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/startup-agent.sql create mode 100644 flink-connector-db2-cdc/src/test/resources/db2_server/startup-cdc-demo.sql create mode 100644 flink-connector-db2-cdc/src/test/resources/log4j2-test.properties diff --git a/flink-connector-db2-cdc/pom.xml b/flink-connector-db2-cdc/pom.xml new file mode 100644 index 000000000..0812f2730 --- /dev/null +++ b/flink-connector-db2-cdc/pom.xml @@ -0,0 +1,134 @@ + + + + + flink-cdc-connectors + com.ververica + 2.3-SNAPSHOT + + 4.0.0 + + flink-connector-db2-cdc + flink-connector-db2-cdc + jar + + + + + com.ververica + flink-connector-debezium + ${project.version} + + + kafka-log4j-appender + org.apache.kafka + + + + + + io.debezium + debezium-connector-db2 + ${debezium.version} + + + + + org.testcontainers + db2 + ${testcontainers.version} + test + + + + + com.ibm.db2.jcc + db2jcc + db2jcc4 + test + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test + + + + com.ververica + flink-connector-test-util + ${project.version} + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-streaming-java + test + test-jar + ${flink.version} + + + + org.apache.flink + flink-tests + ${flink.version} + test + test-jar + + + + com.jayway.jsonpath + json-path + 2.4.0 + test + + + + org.apache.flink + flink-table-common + ${flink.version} + test + test-jar + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + + diff --git a/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/Db2Source.java b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/Db2Source.java new file mode 100644 index 000000000..4e333533d --- /dev/null +++ b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/Db2Source.java @@ -0,0 +1,134 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2; + +import com.ververica.cdc.connectors.db2.table.StartupOptions; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.Validator; +import io.debezium.connector.db2.Db2Connector; + +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Source for DB2 CDC connector. */ +public class Db2Source { + private static final String DB2_DATABASE_SERVER_NAME = "db2_cdc_source"; + + public static Builder builder() { + return new Builder<>(); + } + + /** + * Builder for Db2Source. + * + * @param Output type of the source + */ + public static class Builder { + private String hostname; + private int port = 50000; + private String username; + private String password; + private String database; + // Should be in "schema.table" format + private String[] tableList; + private Properties dbzProperties; + private StartupOptions startupOptions = StartupOptions.initial(); + private DebeziumDeserializationSchema deserializer; + + public DebeziumSourceFunction build() { + Properties props = new Properties(); + props.setProperty("connector.class", Db2Connector.class.getCanonicalName()); + props.setProperty("database.hostname", checkNotNull(hostname)); + props.setProperty("database.port", String.valueOf(port)); + props.setProperty("database.user", checkNotNull(username)); + props.setProperty("database.password", checkNotNull(password)); + props.setProperty("database.dbname", checkNotNull(database)); + props.setProperty("database.server.name", DB2_DATABASE_SERVER_NAME); // Hard-coded here + props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true)); + + if (tableList != null) { + props.setProperty("table.whitelist", String.join(",", tableList)); + } + if (dbzProperties != null) { + props.putAll(dbzProperties); + } + switch (startupOptions.startupMode) { + case INITIAL: + props.setProperty("snapshot.mode", "initial"); + break; + + case LATEST_OFFSET: + props.setProperty("snapshot.mode", "schema_only"); + break; + + default: + throw new UnsupportedOperationException(); + } + + return new DebeziumSourceFunction<>( + deserializer, props, null, Validator.getDefaultValidator()); + } + + public Builder hostname(String hostname) { + this.hostname = hostname; + return this; + } + + public Builder port(int port) { + this.port = port; + return this; + } + + public Builder username(String username) { + this.username = username; + return this; + } + + public Builder password(String password) { + this.password = password; + return this; + } + + public Builder database(String database) { + this.database = database; + return this; + } + + public Builder tableList(String... tableList) { + this.tableList = tableList; + return this; + } + + public Builder debeziumProperties(Properties debeziumProperties) { + this.dbzProperties = debeziumProperties; + return this; + } + + public Builder deserializer(DebeziumDeserializationSchema deserializer) { + this.deserializer = deserializer; + return this; + } + + /** Specifies the startup options. */ + public Builder startupOptions(StartupOptions startupOptions) { + this.startupOptions = startupOptions; + return this; + } + } +} diff --git a/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2ReadableMetaData.java b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2ReadableMetaData.java new file mode 100644 index 000000000..219ec3380 --- /dev/null +++ b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2ReadableMetaData.java @@ -0,0 +1,122 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; + +import com.ververica.cdc.debezium.table.MetadataConverter; +import io.debezium.connector.AbstractSourceInfo; +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +/** Defines the supported metadata columns for {@link Db2TableSource}. */ +public enum Db2ReadableMetaData { + + /** Name of the table that contain the row. */ + TABLE_NAME( + "table_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString( + sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY)); + } + }), + /** Name of the schema that contain the row. */ + SCHEMA_NAME( + "schema_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString( + sourceStruct.getString(AbstractSourceInfo.SCHEMA_NAME_KEY)); + } + }), + + /** Name of the database that contain the row. */ + DATABASE_NAME( + "database_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString( + sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY)); + } + }), + + /** + * It indicates the time that the change was made in the database. If the record is read from + * snapshot of the table instead of the change stream, the value is always 0. + */ + OP_TS( + "op_ts", + DataTypes.TIMESTAMP_LTZ(3).notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return TimestampData.fromEpochMillis( + (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)); + } + }); + + private final String key; + + private final DataType dataType; + + private final MetadataConverter converter; + + Db2ReadableMetaData(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + + public String getKey() { + return key; + } + + public DataType getDataType() { + return dataType; + } + + public MetadataConverter getConverter() { + return converter; + } +} diff --git a/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSource.java b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSource.java new file mode 100644 index 000000000..c47122d34 --- /dev/null +++ b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSource.java @@ -0,0 +1,225 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2.table; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import com.ververica.cdc.connectors.db2.Db2Source; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.table.MetadataConverter; +import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; + +import java.time.ZoneId; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** TableSource for DB2 CDC connector. */ +public class Db2TableSource implements ScanTableSource, SupportsReadingMetadata { + + private final ResolvedSchema physicalSchema; + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + private final int port; + private final String hostname; + private final String database; + private final String schemaName; + private final String tableName; + private final String username; + private final String password; + private final ZoneId serverTimeZone; + private final StartupOptions startupOptions; + private final Properties dbzProperties; + + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + + public Db2TableSource( + ResolvedSchema physicalSchema, + int port, + String hostname, + String database, + String schemaName, + String tableName, + String username, + String password, + ZoneId serverTimeZone, + Properties dbzProperties, + StartupOptions startupOptions) { + this.physicalSchema = physicalSchema; + this.port = port; + this.hostname = hostname; + this.database = database; + this.schemaName = schemaName; + this.tableName = tableName; + this.username = username; + this.password = password; + this.serverTimeZone = serverTimeZone; + this.dbzProperties = dbzProperties; + this.startupOptions = startupOptions; + this.producedDataType = physicalSchema.toPhysicalRowDataType(); + this.metadataKeys = Collections.emptyList(); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.UPDATE_BEFORE) + .build(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + MetadataConverter[] metadataConverters = getMetadataConverters(); + final TypeInformation typeInfo = + scanContext.createTypeInformation(producedDataType); + DebeziumDeserializationSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setMetadataConverters(metadataConverters) + .setResultTypeInfo(typeInfo) + .setServerTimeZone(serverTimeZone) + .build(); + Db2Source.Builder builder = + Db2Source.builder() + .hostname(hostname) + .port(port) + .database(database) + .tableList(schemaName + "." + tableName) + .username(username) + .password(password) + .debeziumProperties(dbzProperties) + .deserializer(deserializer) + .startupOptions(startupOptions); + DebeziumSourceFunction sourceFunction = builder.build(); + return SourceFunctionProvider.of(sourceFunction, false); + } + + private MetadataConverter[] getMetadataConverters() { + if (metadataKeys.isEmpty()) { + return new MetadataConverter[0]; + } + + return metadataKeys.stream() + .map( + key -> + Stream.of(Db2ReadableMetaData.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(Db2ReadableMetaData::getConverter) + .toArray(MetadataConverter[]::new); + } + + @Override + public DynamicTableSource copy() { + Db2TableSource source = + new Db2TableSource( + physicalSchema, + port, + hostname, + database, + schemaName, + tableName, + username, + password, + serverTimeZone, + dbzProperties, + startupOptions); + source.metadataKeys = metadataKeys; + source.producedDataType = producedDataType; + return source; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Db2TableSource)) { + return false; + } + Db2TableSource that = (Db2TableSource) o; + return port == that.port + && Objects.equals(physicalSchema, that.physicalSchema) + && Objects.equals(hostname, that.hostname) + && Objects.equals(database, that.database) + && Objects.equals(schemaName, that.schemaName) + && Objects.equals(tableName, that.tableName) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password) + && Objects.equals(serverTimeZone, that.serverTimeZone) + && Objects.equals(dbzProperties, that.dbzProperties) + && Objects.equals(metadataKeys, that.metadataKeys); + } + + @Override + public int hashCode() { + return Objects.hash( + physicalSchema, + port, + hostname, + database, + schemaName, + tableName, + username, + password, + serverTimeZone, + dbzProperties, + metadataKeys); + } + + @Override + public String asSummaryString() { + return "DB2-CDC"; + } + + @Override + public Map listReadableMetadata() { + return Stream.of(Db2ReadableMetaData.values()) + .collect( + Collectors.toMap( + Db2ReadableMetaData::getKey, Db2ReadableMetaData::getDataType)); + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + this.producedDataType = producedDataType; + } +} diff --git a/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactory.java b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactory.java new file mode 100644 index 000000000..e9f2e7ccc --- /dev/null +++ b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactory.java @@ -0,0 +1,179 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.time.ZoneId; +import java.util.HashSet; +import java.util.Set; + +import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; +import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; + +/** Table source factory for DB2 CDC connector. */ +public class Db2TableSourceFactory implements DynamicTableSourceFactory { + + private static final String IDENTIFIER = "db2-cdc"; + + private static final ConfigOption HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the DB2 database server."); + + private static final ConfigOption PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(50000) + .withDescription("Integer port number of the DB2 database server."); + + private static final ConfigOption USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Username of the DB2 database to use when connecting to the DB2 database server."); + + private static final ConfigOption PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription("Password to use when connecting to the DB2 database server."); + + private static final ConfigOption DATABASE_NAME = + ConfigOptions.key("database-name") + .stringType() + .noDefaultValue() + .withDescription("Database name of the DB2 server to monitor."); + + private static final ConfigOption SCHEMA_NAME = + ConfigOptions.key("schema-name") + .stringType() + .noDefaultValue() + .withDescription("Schema name of the DB2 database to monitor."); + + private static final ConfigOption TABLE_NAME = + ConfigOptions.key("table-name") + .stringType() + .noDefaultValue() + .withDescription( + "Table name of the DB2 database to monitor. This name should not include schema name."); + + private static final ConfigOption SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .defaultValue("UTC") + .withDescription("The session time zone in database server."); + + public static final ConfigOption SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for Db2 CDC consumer, the valid modes are " + + "\"initial\", \"latest-offset\""); + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX); + + final ReadableConfig config = helper.getOptions(); + String hostname = config.get(HOSTNAME); + String username = config.get(USERNAME); + String password = config.get(PASSWORD); + String databaseName = config.get(DATABASE_NAME); + String schemaName = config.get(SCHEMA_NAME); + String tableName = config.get(TABLE_NAME); + int port = config.get(PORT); + ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); + ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); + StartupOptions startupOptions = getStartupOptions(config); + + return new Db2TableSource( + physicalSchema, + port, + hostname, + databaseName, + schemaName, + tableName, + username, + password, + serverTimeZone, + getDebeziumProperties(context.getCatalogTable().getOptions()), + startupOptions); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(HOSTNAME); + options.add(DATABASE_NAME); + options.add(SCHEMA_NAME); + options.add(TABLE_NAME); + options.add(USERNAME); + options.add(PASSWORD); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(PORT); + options.add(SERVER_TIME_ZONE); + options.add(SCAN_STARTUP_MODE); + return options; + } + + private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; + + private static StartupOptions getStartupOptions(ReadableConfig config) { + String modeString = config.get(SCAN_STARTUP_MODE); + + switch (modeString.toLowerCase()) { + case SCAN_STARTUP_MODE_VALUE_INITIAL: + return StartupOptions.initial(); + + case SCAN_STARTUP_MODE_VALUE_LATEST: + return StartupOptions.latest(); + + default: + throw new ValidationException( + String.format( + "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s", + SCAN_STARTUP_MODE.key(), + SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_LATEST, + modeString)); + } + } +} diff --git a/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/StartupMode.java b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/StartupMode.java new file mode 100644 index 000000000..cabc3119c --- /dev/null +++ b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/StartupMode.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2.table; + +/** + * Startup modes for the Db2 CDC Consumer. + * + * @see StartupOptions + */ +public enum StartupMode { + INITIAL, + LATEST_OFFSET +} diff --git a/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/StartupOptions.java b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/StartupOptions.java new file mode 100644 index 000000000..573089403 --- /dev/null +++ b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/StartupOptions.java @@ -0,0 +1,70 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2.table; + +import java.util.Objects; + +/** Debezium startup options. */ +public final class StartupOptions { + public final StartupMode startupMode; + + /** + * Performs an initial snapshot on the monitored database tables upon first startup, and + * continue to read change events from the database’s redo logs. + */ + public static StartupOptions initial() { + return new StartupOptions(StartupMode.INITIAL); + } + + /** + * Never to perform snapshot on the monitored database tables upon first startup, just read from + * the end of the change events which means only have the changes since the connector was + * started. + */ + public static StartupOptions latest() { + return new StartupOptions(StartupMode.LATEST_OFFSET); + } + + private StartupOptions(StartupMode startupMode) { + this.startupMode = startupMode; + + switch (startupMode) { + case INITIAL: + case LATEST_OFFSET: + break; + default: + throw new UnsupportedOperationException(startupMode + " mode is not supported."); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StartupOptions that = (StartupOptions) o; + return startupMode == that.startupMode; + } + + @Override + public int hashCode() { + return Objects.hash(startupMode); + } +} diff --git a/flink-connector-db2-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-db2-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..e37e75dd2 --- /dev/null +++ b/flink-connector-db2-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,15 @@ +# Copyright 2022 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.ververica.cdc.connectors.db2.table.Db2TableSourceFactory diff --git a/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2SourceTest.java b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2SourceTest.java new file mode 100644 index 000000000..4b394d1b6 --- /dev/null +++ b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2SourceTest.java @@ -0,0 +1,584 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2; + +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import com.jayway.jsonpath.JsonPath; +import com.ververica.cdc.connectors.utils.TestSourceContext; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import static com.ververica.cdc.connectors.utils.AssertUtils.assertDelete; +import static com.ververica.cdc.connectors.utils.AssertUtils.assertInsert; +import static com.ververica.cdc.connectors.utils.AssertUtils.assertRead; +import static com.ververica.cdc.connectors.utils.AssertUtils.assertUpdate; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.testcontainers.containers.Db2Container.DB2_PORT; + +/** Test for {@link Db2Source} which also heavily tests {@link DebeziumSourceFunction}. */ +public class Db2SourceTest extends Db2TestBase { + + @Test + public void testConsumingAllEvents() throws Exception { + DebeziumSourceFunction source = createDb2Source("DB2INST1.PRODUCTS1"); + TestSourceContext sourceContext = new TestSourceContext<>(); + + setupSource(source); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + // start the source + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + List records = drain(sourceContext, 9); + assertEquals(9, records.size()); + for (int i = 0; i < records.size(); i++) { + assertRead(records.get(i), "ID", 101 + i); + } + + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'robot','Toy robot',1.304)"); // 110 + records = drain(sourceContext, 1); + assertInsert(records.get(0), "ID", 110); + + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS1 VALUES (1001,'roy','old robot',1234.56)"); // 1001 + records = drain(sourceContext, 1); + assertInsert(records.get(0), "ID", 1001); + + // --------------------------------------------------------------------------------------------------------------- + // Changing the primary key of a row should result in 2 events: INSERT, DELETE + // (TOMBSTONE is dropped) + // --------------------------------------------------------------------------------------------------------------- + statement.execute( + "UPDATE DB2INST1.PRODUCTS1 SET ID=2001, DESCRIPTION='really old robot' WHERE ID=1001"); + records = drain(sourceContext, 2); + assertDelete(records.get(0), "ID", 1001); + assertInsert(records.get(1), "ID", 2001); + + // --------------------------------------------------------------------------------------------------------------- + // Simple UPDATE (with no schema changes) + // --------------------------------------------------------------------------------------------------------------- + statement.execute("UPDATE DB2INST1.PRODUCTS1 SET WEIGHT=1345.67 WHERE ID=2001"); + records = drain(sourceContext, 1); + assertUpdate(records.get(0), "ID", 2001); + + // --------------------------------------------------------------------------------------------------------------- + // Change our schema with a fully-qualified name; we should still see this event + // --------------------------------------------------------------------------------------------------------------- + // Add a column with default to the 'products' table and explicitly update one record + // ... + statement.execute( + "ALTER TABLE DB2INST1.PRODUCTS1 ADD COLUMN VOLUME FLOAT ADD COLUMN ALIAS VARCHAR(30) NULL"); + statement.execute("UPDATE DB2INST1.PRODUCTS1 SET VOLUME=13.5 WHERE ID=2001"); + records = drain(sourceContext, 1); + assertUpdate(records.get(0), "ID", 2001); + + // cleanup + source.cancel(); + source.close(); + runThread.sync(); + } + } + + @Test + public void testCheckpointAndRestore() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + String prevLsn = ""; + { + // --------------------------------------------------------------------------- + // Step-1: start the source from empty state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source = + createDb2Source("DB2INST1.PRODUCTS2"); + // we use blocking context to block the source to emit before last snapshot record + final BlockingSourceContext sourceContext = + new BlockingSourceContext<>(8); + // setup source with empty state + setupSource(source, null, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until consumer is started + int received = drain(sourceContext, 2).size(); + assertEquals(2, received); + + // we can't perform checkpoint during DB snapshot + assertFalse( + waitForCheckpointLock( + sourceContext.getCheckpointLock(), Duration.ofSeconds(3))); + + // unblock the source context to continue the processing + sourceContext.blocker.release(); + // wait until the source finishes the database snapshot + List records = drain(sourceContext, 9 - received); + assertEquals(9, records.size() + received); + + // state is still empty + assertEquals(0, offsetState.list.size()); + assertEquals(0, historyState.list.size()); + + // --------------------------------------------------------------------------- + // Step-2: trigger checkpoint-1 after snapshot finished + // --------------------------------------------------------------------------- + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("db2_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); + + String lsn = JsonPath.read(state, "$.sourceOffset.commit_lsn"); + assertTrue(lsn.compareTo(prevLsn) > 0); + prevLsn = lsn; + + source.cancel(); + source.close(); + runThread.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-3: restore the source from state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source2 = + createDb2Source("DB2INST1.PRODUCTS2"); + final TestSourceContext sourceContext2 = new TestSourceContext<>(); + setupSource(source2, 1L, offsetState, historyState, true, 0, 1); + final CheckedThread runThread2 = + new CheckedThread() { + @Override + public void go() throws Exception { + source2.run(sourceContext2); + } + }; + runThread2.start(); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2)); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS2 VALUES (default,'robot','Toy robot',1.304)"); // 110 + List records = drain(sourceContext2, 1); + assertEquals(1, records.size()); + assertInsert(records.get(0), "ID", 110); + + // --------------------------------------------------------------------------- + // Step-4: trigger checkpoint-2 during DML operations + // --------------------------------------------------------------------------- + synchronized (sourceContext2.getCheckpointLock()) { + // trigger checkpoint-1 + source2.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138)); + } + + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("db2_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); + String lsn = JsonPath.read(state, "$.sourceOffset.commit_lsn"); + assertTrue(lsn.compareTo(prevLsn) > 0); + + // execute 2 more DMLs to have more binlog + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS2 VALUES (1001,'roy','old robot',1234.56)"); // 1001 + statement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT=1345.67 WHERE ID=1001"); + } + + // cancel the source + source2.cancel(); + source2.close(); + runThread2.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-5: restore the source from checkpoint-2 + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source3 = + createDb2Source("DB2INST1.PRODUCTS2"); + final TestSourceContext sourceContext3 = new TestSourceContext<>(); + setupSource(source3, 2L, offsetState, historyState, true, 0, 1); + + // restart the source + final CheckedThread runThread3 = + new CheckedThread() { + @Override + public void go() throws Exception { + source3.run(sourceContext3); + } + }; + runThread3.start(); + + // consume the unconsumed binlog + List records = drain(sourceContext3, 2); + assertInsert(records.get(0), "ID", 1001); + assertUpdate(records.get(1), "ID", 1001); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext3)); + + // can continue to receive new events + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("DELETE FROM DB2INST1.PRODUCTS2 WHERE ID=1001"); + } + records = drain(sourceContext3, 1); + assertDelete(records.get(0), "ID", 1001); + + // --------------------------------------------------------------------------- + // Step-6: trigger checkpoint-2 to make sure we can continue to to further checkpoints + // --------------------------------------------------------------------------- + synchronized (sourceContext3.getCheckpointLock()) { + // checkpoint 3 + source3.snapshotState(new StateSnapshotContextSynchronousImpl(233, 233)); + } + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("db2_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); + String lsn = JsonPath.read(state, "$.sourceOffset.commit_lsn"); + assertTrue(lsn.compareTo(prevLsn) > 0); + + source3.cancel(); + source3.close(); + runThread3.sync(); + } + } + + // ------------------------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------------------------ + + private DebeziumSourceFunction createDb2Source(String tableName) { + return Db2Source.builder() + .hostname(DB2_CONTAINER.getHost()) + .port(DB2_CONTAINER.getMappedPort(DB2_PORT)) + .database(DB2_CONTAINER.getDatabaseName()) + .username(DB2_CONTAINER.getUsername()) + .password(DB2_CONTAINER.getPassword()) + .tableList(tableName) + .deserializer(new ForwardDeserializeSchema()) + .build(); + } + + private List drain(TestSourceContext sourceContext, int expectedRecordCount) + throws Exception { + List allRecords = new ArrayList<>(); + LinkedBlockingQueue> queue = sourceContext.getCollectedOutputs(); + while (allRecords.size() < expectedRecordCount) { + StreamRecord record = queue.poll(100, TimeUnit.SECONDS); + if (record != null) { + allRecords.add(record.getValue()); + } else { + throw new RuntimeException( + "Can't receive " + expectedRecordCount + " elements before timeout."); + } + } + + return allRecords; + } + + private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout) + throws Exception { + final Semaphore semaphore = new Semaphore(0); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute( + () -> { + synchronized (checkpointLock) { + semaphore.release(); + } + }); + boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS); + executor.shutdownNow(); + return result; + } + + /** + * Wait for a maximum amount of time until the first record is available. + * + * @param timeout the maximum amount of time to wait; must not be negative + * @return {@code true} if records are available, or {@code false} if the timeout occurred and + * no records are available + */ + private boolean waitForAvailableRecords(Duration timeout, TestSourceContext sourceContext) + throws InterruptedException { + long now = System.currentTimeMillis(); + long stop = now + timeout.toMillis(); + while (System.currentTimeMillis() < stop) { + if (!sourceContext.getCollectedOutputs().isEmpty()) { + break; + } + Thread.sleep(10); // save CPU + } + return !sourceContext.getCollectedOutputs().isEmpty(); + } + + private static void setupSource(DebeziumSourceFunction source) throws Exception { + setupSource( + source, null, null, null, + true, // enable checkpointing; auto commit should be ignored + 0, 1); + } + + private static void setupSource( + DebeziumSourceFunction source, + @Nullable Long restoredCheckpointId, + ListState restoredOffsetState, + ListState restoredHistoryState, + boolean isCheckpointingEnabled, + int subtaskIndex, + int totalNumSubtasks) + throws Exception { + + // run setup procedure in operator life cycle + source.setRuntimeContext( + new MockStreamingRuntimeContext( + isCheckpointingEnabled, totalNumSubtasks, subtaskIndex)); + source.initializeState( + new MockFunctionInitializationContext( + restoredCheckpointId, + new MockOperatorStateStore(restoredOffsetState, restoredHistoryState))); + source.open(new Configuration()); + } + + private static class ForwardDeserializeSchema + implements DebeziumDeserializationSchema { + + private static final long serialVersionUID = 1L; + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + out.collect(record); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(SourceRecord.class); + } + } + + private static class MockOperatorStateStore implements OperatorStateStore { + + private final ListState restoredOffsetListState; + private final ListState restoredHistoryListState; + + private MockOperatorStateStore( + ListState restoredOffsetListState, ListState restoredHistoryListState) { + this.restoredOffsetListState = restoredOffsetListState; + this.restoredHistoryListState = restoredHistoryListState; + } + + @Override + @SuppressWarnings("unchecked") + public ListState getUnionListState(ListStateDescriptor stateDescriptor) + throws Exception { + if (stateDescriptor.getName().equals(DebeziumSourceFunction.OFFSETS_STATE_NAME)) { + return (ListState) restoredOffsetListState; + } else if (stateDescriptor + .getName() + .equals(DebeziumSourceFunction.HISTORY_RECORDS_STATE_NAME)) { + return (ListState) restoredHistoryListState; + } else { + throw new IllegalStateException("Unknown state."); + } + } + + @Override + public BroadcastState getBroadcastState( + MapStateDescriptor stateDescriptor) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public ListState getListState(ListStateDescriptor stateDescriptor) + throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredStateNames() { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredBroadcastStateNames() { + throw new UnsupportedOperationException(); + } + } + + private static class MockFunctionInitializationContext + implements FunctionInitializationContext { + + private final Long restoredCheckpointId; + private final OperatorStateStore operatorStateStore; + + private MockFunctionInitializationContext( + Long restoredCheckpointId, OperatorStateStore operatorStateStore) { + this.restoredCheckpointId = restoredCheckpointId; + this.operatorStateStore = operatorStateStore; + } + + @Override + public boolean isRestored() { + return restoredCheckpointId != null; + } + + @Override + public OptionalLong getRestoredCheckpointId() { + if (restoredCheckpointId == null) { + return OptionalLong.empty(); + } + return OptionalLong.of(restoredCheckpointId); + } + + @Override + public OperatorStateStore getOperatorStateStore() { + return operatorStateStore; + } + + @Override + public KeyedStateStore getKeyedStateStore() { + throw new UnsupportedOperationException(); + } + } + + private static class BlockingSourceContext extends TestSourceContext { + + private final Semaphore blocker = new Semaphore(0); + private final int expectedCount; + private int currentCount = 0; + + private BlockingSourceContext(int expectedCount) { + this.expectedCount = expectedCount; + } + + @Override + public void collect(T t) { + super.collect(t); + currentCount++; + if (currentCount == expectedCount) { + try { + // block the source to emit records + blocker.acquire(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + private static final class TestingListState implements ListState { + + private final List list = new ArrayList<>(); + private boolean clearCalled = false; + + @Override + public void clear() { + list.clear(); + clearCalled = true; + } + + @Override + public Iterable get() throws Exception { + return list; + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + list.add(value); + } + + public List getList() { + return list; + } + + boolean isClearCalled() { + return clearCalled; + } + + @Override + public void update(List values) throws Exception { + clear(); + + addAll(values); + } + + @Override + public void addAll(List values) throws Exception { + if (values != null) { + values.forEach( + v -> Preconditions.checkNotNull(v, "You cannot add null to a ListState.")); + + list.addAll(values); + } + } + } +} diff --git a/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2TestBase.java b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2TestBase.java new file mode 100644 index 000000000..8cc38ef5d --- /dev/null +++ b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/Db2TestBase.java @@ -0,0 +1,106 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2; + +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Db2Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.builder.ImageFromDockerfile; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.stream.Stream; + +import static org.junit.Assert.assertNotNull; + +/** Basic class for testing DB2 source, this contains a DB2 container which enables binlog. */ +public class Db2TestBase { + + private static final Logger LOG = LoggerFactory.getLogger(Db2TestBase.class); + + private static final DockerImageName DEBEZIUM_DOCKER_IMAGE_NAME = + DockerImageName.parse( + new ImageFromDockerfile("custom/db2-cdc:1.4") + .withDockerfile(getFilePath("db2_server/Dockerfile")) + .get()) + .asCompatibleSubstituteFor("ibmcom/db2"); + private static final Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(LOG); + private static boolean db2AsnAgentRunning = false; + + protected static final Db2Container DB2_CONTAINER = + new Db2Container(DEBEZIUM_DOCKER_IMAGE_NAME) + .withDatabaseName("testdb") + .withUsername("db2inst1") + .withPassword("flinkpw") + .withEnv("AUTOCONFIG", "false") + .withEnv("ARCHIVE_LOGS", "true") + .acceptLicense() + .withLogConsumer(logConsumer) + .withLogConsumer( + outputFrame -> { + if (outputFrame + .getUtf8String() + .contains("The asncdc program enable finished")) { + db2AsnAgentRunning = true; + } + }); + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(DB2_CONTAINER)).join(); + LOG.info("Containers are started."); + + LOG.info("Waiting db2 asn agent start..."); + while (!db2AsnAgentRunning) { + try { + Thread.sleep(5000L); + } catch (InterruptedException e) { + LOG.error("unexpected interrupted exception", e); + } + } + LOG.info("Db2 asn agent are started."); + } + + protected Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + DB2_CONTAINER.getJdbcUrl(), + DB2_CONTAINER.getUsername(), + DB2_CONTAINER.getPassword()); + } + + private static Path getFilePath(String resourceFilePath) { + Path path = null; + try { + URL filePath = Db2TestBase.class.getClassLoader().getResource(resourceFilePath); + assertNotNull("Cannot locate " + resourceFilePath, filePath); + path = Paths.get(filePath.toURI()); + } catch (URISyntaxException e) { + LOG.error("Cannot get path from URI.", e); + } + return path; + } +} diff --git a/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2ConnectorITCase.java b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2ConnectorITCase.java new file mode 100644 index 000000000..f8120584e --- /dev/null +++ b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2ConnectorITCase.java @@ -0,0 +1,448 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.utils.LegacyRowResource; + +import com.ververica.cdc.connectors.db2.Db2TestBase; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.testcontainers.containers.Db2Container.DB2_PORT; + +/** Integration tests for DB2 CDC source. */ +public class Db2ConnectorITCase extends Db2TestBase { + private static final Logger LOG = LoggerFactory.getLogger(Db2ConnectorITCase.class); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + @ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE; + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + env.setParallelism(1); + } + + @Test + public void testConsumingAllEvents() + throws SQLException, InterruptedException, ExecutionException { + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " ID INT NOT NULL," + + " NAME STRING," + + " DESCRIPTION STRING," + + " WEIGHT DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'db2-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'" + + ")", + DB2_CONTAINER.getHost(), + DB2_CONTAINER.getMappedPort(DB2_PORT), + DB2_CONTAINER.getUsername(), + DB2_CONTAINER.getPassword(), + DB2_CONTAINER.getDatabaseName(), + "DB2INST1", + "PRODUCTS"); + String sinkDDL = + "CREATE TABLE sink (" + + " name STRING," + + " weightSum DECIMAL(10,3)," + + " PRIMARY KEY (name) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '20'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME"); + + waitForSnapshotStarted("sink"); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;"); + statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.1' WHERE ID=107;"); + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS VALUES (110,'jacket','water resistent white wind breaker',0.2);"); + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS VALUES (111,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;"); + statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.17' WHERE ID=111;"); + statement.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=111;"); + } + + waitForSinkSize("sink", 20); + + /* + *
+         * The final database table looks like this:
+         *
+         * > SELECT * FROM DB2INST1.PRODUCTS;
+         * +-----+--------------------+---------------------------------------------------------+--------+
+         * | ID  | NAME               | DESCRIPTION                                             | WEIGHT |
+         * +-----+--------------------+---------------------------------------------------------+--------+
+         * | 101 | scooter            | Small 2-wheel scooter                                   |   3.14 |
+         * | 102 | car battery        | 12V car battery                                         |    8.1 |
+         * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |    0.8 |
+         * | 104 | hammer             | 12oz carpenter's hammer                                 |   0.75 |
+         * | 105 | hammer             | 14oz carpenter's hammer                                 |  0.875 |
+         * | 106 | hammer             | 18oz carpenter hammer                                   |      1 |
+         * | 107 | rocks              | box of assorted rocks                                   |    5.1 |
+         * | 108 | jacket             | water resistent black wind breaker                      |    0.1 |
+         * | 109 | spare tire         | 24 inch spare tire                                      |   22.2 |
+         * | 110 | jacket             | new water resistent white wind breaker                  |    0.5 |
+         * +-----+--------------------+---------------------------------------------------------+--------+
+         * 
+ */ + + String[] expected = + new String[] { + "scooter,3.140", + "car battery,8.100", + "12-pack drill bits,0.800", + "hammer,2.625", + "rocks,5.100", + "jacket,0.600", + "spare tire,22.200" + }; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + + @Test + public void testAllTypes() throws Exception { + // NOTE: db2 is not case sensitive by default, the schema returned by debezium + // is uppercase, thus we need use uppercase when defines a db2 table. + String sourceDDL = + String.format( + "CREATE TABLE full_types (\n" + + " ID INTEGER NOT NULL,\n" + // Debezium cannot track db2 boolean type, see: + // https://issues.redhat.com/browse/DBZ-2587 + // + " BOOLEAN_C BOOLEAN NOT NULL,\n" + + " SMALL_C SMALLINT,\n" + + " INT_C INTEGER,\n" + + " BIG_C BIGINT,\n" + + " REAL_C FLOAT,\n" + + " DOUBLE_C DOUBLE,\n" + + " NUMERIC_C DECIMAL(10, 5),\n" + + " DECIMAL_C DECIMAL(10, 1),\n" + + " VARCHAR_C STRING,\n" + + " CHAR_C STRING,\n" + + " CHARACTER_C STRING,\n" + + " TIMESTAMP_C TIMESTAMP(3),\n" + + " DATE_C DATE,\n" + + " TIME_C TIME(0),\n" + + " DEFAULT_NUMERIC_C DECIMAL,\n" + + " TIMESTAMP_PRECISION_C TIMESTAMP(9)\n" + + ") WITH (" + + " 'connector' = 'db2-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'" + + ")", + DB2_CONTAINER.getHost(), + DB2_CONTAINER.getMappedPort(DB2_PORT), + DB2_CONTAINER.getUsername(), + DB2_CONTAINER.getPassword(), + DB2_CONTAINER.getDatabaseName(), + "DB2INST1", + "FULL_TYPES"); + String sinkDDL = + "CREATE TABLE sink (\n" + + " id INTEGER NOT NULL,\n" + + " small_c SMALLINT,\n" + + " int_c INTEGER,\n" + + " big_c BIGINT,\n" + + " real_c FLOAT,\n" + + " double_c DOUBLE,\n" + + " numeric_c DECIMAL(10, 5),\n" + + " decimal_c DECIMAL(10, 1),\n" + + " varchar_c STRING,\n" + + " char_c STRING,\n" + + " character_c STRING,\n" + + " timestamp_c TIMESTAMP(3),\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " default_numeric_c DECIMAL,\n" + + " timestamp_precision_c TIMESTAMP(9)\n" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types"); + + waitForSnapshotStarted("sink"); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("UPDATE DB2INST1.FULL_TYPES SET SMALL_C=0 WHERE ID=1;"); + } + + waitForSinkSize("sink", 3); + + List expected = + Arrays.asList( + "+I(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)", + "-U(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)", + "+U(1,0,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)"); + List actual = TestValuesTableFactory.getRawResults("sink"); + assertEquals(expected, actual); + + result.getJobClient().get().cancel().get(); + } + + @Test + public void testStartupFromLatestOffset() throws Exception { + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " ID INT NOT NULL," + + " NAME STRING," + + " DESCRIPTION STRING," + + " WEIGHT DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'db2-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s' ," + + " 'scan.startup.mode' = 'latest-offset'" + + ")", + DB2_CONTAINER.getHost(), + DB2_CONTAINER.getMappedPort(DB2_PORT), + DB2_CONTAINER.getUsername(), + DB2_CONTAINER.getPassword(), + DB2_CONTAINER.getDatabaseName(), + "DB2INST1", + "PRODUCTS1"); + String sinkDDL = + "CREATE TABLE sink " + + " WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ") LIKE debezium_source (EXCLUDING OPTIONS)"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + // wait for the source startup, we don't have a better way to wait it, use sleep for now + do { + Thread.sleep(5000L); + } while (result.getJobClient().get().getJobStatus().get() != RUNNING); + Thread.sleep(30000L); + LOG.info("Snapshot should end and start to read binlog."); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'jacket','water resistent white wind breaker',0.2)"); + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'scooter','Big 2-wheel scooter ',5.18)"); + statement.execute( + "UPDATE DB2INST1.PRODUCTS1 SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110"); + statement.execute("UPDATE DB2INST1.PRODUCTS1 SET WEIGHT='5.17' WHERE ID=111"); + statement.execute("DELETE FROM DB2INST1.PRODUCTS1 WHERE ID=111"); + } + + waitForSinkSize("sink", 7); + + String[] expected = + new String[] {"110,jacket,new water resistent white wind breaker,0.500"}; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + + @Test + public void testMetadataColumns() throws Throwable { + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " DB_NAME STRING METADATA FROM 'database_name' VIRTUAL," + + " SCHEMA_NAME STRING METADATA FROM 'schema_name' VIRTUAL," + + " TABLE_NAME STRING METADATA FROM 'table_name' VIRTUAL," + + " ID INT NOT NULL," + + " NAME STRING," + + " DESCRIPTION STRING," + + " WEIGHT DECIMAL(10,3)," + + " PRIMARY KEY (ID) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'db2-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'" + + ")", + DB2_CONTAINER.getHost(), + DB2_CONTAINER.getMappedPort(DB2_PORT), + DB2_CONTAINER.getUsername(), + DB2_CONTAINER.getPassword(), + DB2_CONTAINER.getDatabaseName(), + "DB2INST1", + "PRODUCTS2"); + String sinkDDL = + "CREATE TABLE sink (" + + " database_name STRING," + + " schema_name STRING," + + " table_name STRING," + + " id int," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '20'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + + waitForSnapshotStarted("sink"); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "UPDATE DB2INST1.PRODUCTS2 SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;"); + statement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT='5.1' WHERE ID=107;"); + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS2 VALUES (110,'jacket','water resistent white wind breaker',0.2);"); + statement.execute( + "INSERT INTO DB2INST1.PRODUCTS2 VALUES (111,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE DB2INST1.PRODUCTS2 SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;"); + statement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT='5.17' WHERE ID=111;"); + statement.execute("DELETE FROM DB2INST1.PRODUCTS2 WHERE ID=111;"); + } + + waitForSinkSize("sink", 16); + + List expected = + Arrays.asList( + "+I(testdb,DB2INST1,PRODUCTS2,101,scooter,Small 2-wheel scooter,3.140)", + "+I(testdb,DB2INST1,PRODUCTS2,102,car battery,12V car battery,8.100)", + "+I(testdb,DB2INST1,PRODUCTS2,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", + "+I(testdb,DB2INST1,PRODUCTS2,104,hammer,12oz carpenter's hammer,0.750)", + "+I(testdb,DB2INST1,PRODUCTS2,105,hammer,14oz carpenter's hammer,0.875)", + "+I(testdb,DB2INST1,PRODUCTS2,106,hammer,16oz carpenter's hammer,1.000)", + "+I(testdb,DB2INST1,PRODUCTS2,107,rocks,box of assorted rocks,5.300)", + "+I(testdb,DB2INST1,PRODUCTS2,108,jacket,water resistent black wind breaker,0.100)", + "+I(testdb,DB2INST1,PRODUCTS2,109,spare tire,24 inch spare tire,22.200)", + "+U(testdb,DB2INST1,PRODUCTS2,106,hammer,18oz carpenter hammer,1.000)", + "+U(testdb,DB2INST1,PRODUCTS2,107,rocks,box of assorted rocks,5.100)", + "+I(testdb,DB2INST1,PRODUCTS2,110,jacket,water resistent white wind breaker,0.200)", + "+I(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.180)", + "+U(testdb,DB2INST1,PRODUCTS2,110,jacket,new water resistent white wind breaker,0.500)", + "+U(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.170)", + "-D(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.170)"); + + List actual = TestValuesTableFactory.getRawResults("sink"); + Collections.sort(expected); + Collections.sort(actual); + assertEquals(expected, actual); + result.getJobClient().get().cancel().get(); + } + + private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { + while (sinkSize(sinkName) == 0) { + Thread.sleep(1000L); + } + } + + private static void waitForSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(1000L); + } + } + + private static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } +} diff --git a/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java new file mode 100644 index 000000000..b9b80d6f8 --- /dev/null +++ b/flink-connector-db2-cdc/src/test/java/com/ververica/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java @@ -0,0 +1,255 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.db2.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.flink.util.ExceptionUtils; + +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import org.junit.Test; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction; +import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; +import static org.apache.flink.table.api.TableSchema.fromResolvedSchema; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Test for {@link Db2TableSource} created by {@link Db2TableSourceFactory}. */ +public class Db2TableSourceFactoryTest { + + private static final ResolvedSchema SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical("aaa", DataTypes.INT().notNull()), + Column.physical("bbb", DataTypes.STRING().notNull()), + Column.physical("ccc", DataTypes.DOUBLE()), + Column.physical("ddd", DataTypes.DECIMAL(31, 18)), + Column.physical("eee", DataTypes.TIMESTAMP(3))), + new ArrayList<>(), + UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa"))); + + private static final ResolvedSchema SCHEMA_WITH_METADATA = + new ResolvedSchema( + Arrays.asList( + Column.physical("aaa", DataTypes.INT().notNull()), + Column.physical("bbb", DataTypes.STRING().notNull()), + Column.physical("ccc", DataTypes.DOUBLE()), + Column.physical("ddd", DataTypes.DECIMAL(31, 18)), + Column.physical("eee", DataTypes.TIMESTAMP(3)), + Column.metadata( + "database_name", DataTypes.STRING(), "database_name", true), + Column.metadata("table_name", DataTypes.STRING(), "table_name", true), + Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true), + Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true)), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa"))); + + private static final String MY_LOCALHOST = "localhost"; + private static final String MY_USERNAME = "flinkuser"; + private static final String MY_PASSWORD = "flinkpw"; + private static final String MY_DATABASE = "myDB"; + private static final String MY_SCHEMA = "flinkuser"; + private static final String MY_TABLE = "myTable"; + private static final Properties PROPERTIES = new Properties(); + + @Test + public void testCommonProperties() { + Map properties = getAllOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties, SCHEMA); + Db2TableSource expectedSource = + new Db2TableSource( + getPhysicalSchema(SCHEMA), + 50000, + MY_LOCALHOST, + MY_DATABASE, + MY_SCHEMA, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + PROPERTIES, + StartupOptions.initial()); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testOptionalProperties() { + Map options = getAllOptions(); + options.put("port", "50000"); + options.put("server-time-zone", "Asia/Shanghai"); + options.put("debezium.snapshot.mode", "schema_only"); + + DynamicTableSource actualSource = createTableSource(options, SCHEMA); + Properties dbzProperties = new Properties(); + dbzProperties.put("snapshot.mode", "schema_only"); + Db2TableSource expectedSource = + new Db2TableSource( + getPhysicalSchema(SCHEMA), + 50000, + MY_LOCALHOST, + MY_DATABASE, + MY_SCHEMA, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("Asia/Shanghai"), + dbzProperties, + StartupOptions.latest()); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testValidation() { + // validate illegal port + try { + Map properties = getAllOptions(); + properties.put("port", "123b"); + + createTableSource(properties, SCHEMA); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, "Could not parse value '123b' for key 'port'.") + .isPresent()); + } + + // validate missing required + Factory factory = new Db2TableSourceFactory(); + for (ConfigOption requiredOption : factory.requiredOptions()) { + Map properties = getAllOptions(); + properties.remove(requiredOption.key()); + + try { + createTableSource(properties, SCHEMA); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, + "Missing required options are:\n\n" + requiredOption.key()) + .isPresent()); + } + } + + // validate unsupported option + try { + Map properties = getAllOptions(); + properties.put("unknown", "abc"); + + createTableSource(properties, SCHEMA); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage(t, "Unsupported options:\n\nunknown") + .isPresent()); + } + } + + @Test + public void testMetadataColumns() { + Map properties = getAllOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties, SCHEMA_WITH_METADATA); + Db2TableSource db2TableSource = (Db2TableSource) actualSource; + db2TableSource.applyReadableMetadata( + Arrays.asList("op_ts", "database_name", "table_name", "schema_name"), + SCHEMA_WITH_METADATA.toSourceRowDataType()); + actualSource = db2TableSource.copy(); + Db2TableSource expectedSource = + new Db2TableSource( + SCHEMA_WITH_METADATA, + 50000, + MY_LOCALHOST, + MY_DATABASE, + MY_SCHEMA, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + new Properties(), + StartupOptions.initial()); + expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); + expectedSource.metadataKeys = + Arrays.asList("op_ts", "database_name", "table_name", "schema_name"); + + assertEquals(expectedSource, actualSource); + + ScanTableSource.ScanRuntimeProvider provider = + db2TableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + DebeziumSourceFunction debeziumSourceFunction = + (DebeziumSourceFunction) + ((SourceFunctionProvider) provider).createSourceFunction(); + assertProducedTypeOfSourceFunction(debeziumSourceFunction, expectedSource.producedDataType); + } + + private Map getAllOptions() { + Map options = new HashMap<>(); + options.put("connector", "db2-cdc"); + options.put("hostname", MY_LOCALHOST); + options.put("database-name", MY_DATABASE); + options.put("schema-name", MY_SCHEMA); + options.put("table-name", MY_TABLE); + options.put("username", MY_USERNAME); + options.put("password", MY_PASSWORD); + return options; + } + + private static DynamicTableSource createTableSource( + Map options, ResolvedSchema schema) { + return FactoryUtil.createTableSource( + null, + ObjectIdentifier.of("default", "default", "t1"), + new ResolvedCatalogTable( + CatalogTable.of( + fromResolvedSchema(schema).toSchema(), + "mock source", + new ArrayList<>(), + options), + schema), + new Configuration(), + Db2TableSourceFactoryTest.class.getClassLoader(), + false); + } +} diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile b/flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile new file mode 100644 index 000000000..42a117d25 --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile @@ -0,0 +1,36 @@ +################################################################################ +# Copyright 2022 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +FROM ibmcom/db2:11.5.0.0a + +MAINTAINER Peter Urbanetz + +RUN mkdir -p /asncdctools/src + +ADD asncdc_UDF.sql /asncdctools/src +ADD asncdcaddremove.sql /asncdctools/src +ADD asncdctables.sql /asncdctools/src +ADD dbsetup.sh /asncdctools/src +ADD startup-agent.sql /asncdctools/src +ADD startup-cdc-demo.sql /asncdctools/src +ADD inventory.sql /asncdctools/src +ADD column_type_test.sql /asncdctools/src +ADD asncdc.c /asncdctools/src + +RUN chmod -R 777 /asncdctools + +RUN mkdir /var/custom +ADD cdcsetup.sh /var/custom +RUN chmod -R 777 /var/custom diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc.c b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc.c new file mode 100644 index 000000000..3d27fe4cb --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc.c @@ -0,0 +1,178 @@ +/***************************************************************************** +* Copyright 2022 Ververica Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +******************************************************************************/ + +#include +#include +#include +#include +#include +#include + +void SQL_API_FN asncdcservice( + SQLUDF_VARCHAR *asnCommand, /* input */ + SQLUDF_VARCHAR *asnService, + SQLUDF_CLOB *fileData, /* output */ + /* null indicators */ + SQLUDF_NULLIND *asnCommand_ind, /* input */ + SQLUDF_NULLIND *asnService_ind, + SQLUDF_NULLIND *fileData_ind, + SQLUDF_TRAIL_ARGS, + struct sqludf_dbinfo *dbinfo) +{ + + int fd; + char tmpFileName[] = "/tmp/fileXXXXXX"; + fd = mkstemp(tmpFileName); + + int strcheck = 0; + char cmdstring[256]; + + + char* szDb2path = getenv("HOME"); + + + + char str[20]; + int len = 0; + char c; + char *buffer = NULL; + FILE *pidfile; + + char dbname[129]; + memset(dbname, '\0', 129); + strncpy(dbname, (char *)(dbinfo->dbname), dbinfo->dbnamelen); + dbname[dbinfo->dbnamelen] = '\0'; + + int pid; + if (strcmp(asnService, "asncdc") == 0) + { + strcheck = sprintf(cmdstring, "pgrep -fx \"%s/sqllib/bin/asncap capture_schema=%s capture_server=%s\" > %s", szDb2path, asnService, dbname, tmpFileName); + int callcheck; + callcheck = system(cmdstring); + pidfile = fopen(tmpFileName, "r"); + while ((c = fgetc(pidfile)) != EOF) + { + if (c == '\n') + { + break; + } + len++; + } + buffer = (char *)malloc(sizeof(char) * len); + fseek(pidfile, 0, SEEK_SET); + fread(buffer, sizeof(char), len, pidfile); + fclose(pidfile); + pidfile = fopen(tmpFileName, "w"); + if (strcmp(asnCommand, "start") == 0) + { + if (len == 0) // is not running + { + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asncap capture_schema=%s capture_server=%s &", szDb2path, asnService, dbname); + fprintf(pidfile, "start --> %s \n", cmdstring); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is already running"); + } + } + if ((strcmp(asnCommand, "prune") == 0) || + (strcmp(asnCommand, "reinit") == 0) || + (strcmp(asnCommand, "suspend") == 0) || + (strcmp(asnCommand, "resume") == 0) || + (strcmp(asnCommand, "status") == 0) || + (strcmp(asnCommand, "stop") == 0)) + { + if (len > 0) + { + //buffer[len] = '\0'; + //strcheck = sprintf(cmdstring, "/bin/kill -SIGINT %s ", buffer); + //fprintf(pidfile, "stop --> %s", cmdstring); + //callcheck = system(cmdstring); + strcheck = sprintf(cmdstring, "%s/sqllib/bin/asnccmd capture_schema=%s capture_server=%s %s >> %s", szDb2path, asnService, dbname, asnCommand, tmpFileName); + //fprintf(pidfile, "%s --> %s \n", cmdstring, asnCommand); + callcheck = system(cmdstring); + } + else + { + fprintf(pidfile, "asncap is not running"); + } + } + + fclose(pidfile); + } + /* system(cmdstring); */ + + int rc = 0; + long fileSize = 0; + size_t readCnt = 0; + FILE *f = NULL; + + f = fopen(tmpFileName, "r"); + if (!f) + { + strcpy(SQLUDF_MSGTX, "Could not open file "); + strncat(SQLUDF_MSGTX, tmpFileName, + SQLUDF_MSGTEXT_LEN - strlen(SQLUDF_MSGTX) - 1); + strncpy(SQLUDF_STATE, "38100", SQLUDF_SQLSTATE_LEN); + return; + } + + rc = fseek(f, 0, SEEK_END); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38101", SQLUDF_SQLSTATE_LEN); + return; + } + + /* verify the file size */ + fileSize = ftell(f); + if (fileSize > fileData->length) + { + strcpy(SQLUDF_MSGTX, "File too large"); + strncpy(SQLUDF_STATE, "38102", SQLUDF_SQLSTATE_LEN); + return; + } + + /* go to the beginning and read the entire file */ + rc = fseek(f, 0, 0); + if (rc) + { + sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc); + strncpy(SQLUDF_STATE, "38103", SQLUDF_SQLSTATE_LEN); + return; + } + + readCnt = fread(fileData->data, 1, fileSize, f); + if (readCnt != fileSize) + { + /* raise a warning that something weird is going on */ + sprintf(SQLUDF_MSGTX, "Could not read entire file " + "(%d vs %d)", + readCnt, fileSize); + strncpy(SQLUDF_STATE, "01H10", SQLUDF_SQLSTATE_LEN); + *fileData_ind = -1; + } + else + { + fileData->length = readCnt; + *fileData_ind = 0; + } + // remove temorary file + rc = remove(tmpFileName); + //fclose(pFile); +} diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc_UDF.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc_UDF.sql new file mode 100644 index 000000000..2f83c549d --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdc_UDF.sql @@ -0,0 +1,30 @@ +-- Copyright 2022 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +DROP SPECIFIC FUNCTION ASNCDC.asncdcservice; + +CREATE FUNCTION ASNCDC.ASNCDCSERVICES(command VARCHAR(6), service VARCHAR(8)) + RETURNS CLOB(100K) + SPECIFIC asncdcservice + EXTERNAL NAME 'asncdc!asncdcservice' + LANGUAGE C + PARAMETER STYLE SQL + DBINFO + DETERMINISTIC + NOT FENCED + RETURNS NULL ON NULL INPUT + NO SQL + NO EXTERNAL ACTION + NO SCRATCHPAD + ALLOW PARALLEL + NO FINAL CALL; diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/asncdcaddremove.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdcaddremove.sql new file mode 100644 index 000000000..1ad349799 --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdcaddremove.sql @@ -0,0 +1,204 @@ +-- Copyright 2022 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Define ASNCDC.REMOVETABLE() and ASNCDC.ADDTABLE() +-- ASNCDC.ADDTABLE() puts a table in CDC mode, making the ASNCapture server collect changes for the table +-- ASNCDC.REMOVETABLE() makes the ASNCapture server stop collecting changes for that table + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.REMOVETABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE stmtSQL VARCHAR(2048); + +DECLARE SQLCODE INT; +DECLARE SQLSTATE CHAR(5); +DECLARE RC_SQLCODE INT DEFAULT 0; +DECLARE RC_SQLSTATE CHAR(5) DEFAULT '00000'; + +DECLARE CONTINUE HANDLER FOR SQLEXCEPTION, SQLWARNING, NOT FOUND VALUES (SQLCODE, SQLSTATE) INTO RC_SQLCODE, RC_SQLSTATE; + +-- delete ASN.IBMSNAP_PRUNCTL entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_PRUNCNTL WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_Register entries / source +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_REGISTER WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- drop CD Table / source +SET stmtSQL = 'DROP TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename ; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUBS_COLS entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_COLS WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMSNAP_SUSBS_MEMBER entries /target +SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_MEMBR WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_COLVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_COLVERSION col WHERE EXISTS (SELECT * FROM ASNCDC.IBMQREP_TABVERSION tab WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || '''AND col.TABLEID1 = tab.TABLEID1 AND col.TABLEID2 = tab.TABLEID2'; + EXECUTE IMMEDIATE stmtSQL; + +-- delete ASN.IBMQREP_TABVERSION +SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_TABVERSION WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || ''''; + EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE NONE'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; + +--#SET TERMINATOR @ +CREATE OR REPLACE PROCEDURE ASNCDC.ADDTABLE( +in tableschema VARCHAR(128), +in tablename VARCHAR(128) +) +LANGUAGE SQL +P1: +BEGIN + +DECLARE SQLSTATE CHAR(5); + +DECLARE stmtSQL VARCHAR(2048); + +SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE CHANGES'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' AS ( SELECT ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_COMMITSEQ, ' || + ' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_INTENTSEQ, ' || + ' CAST ('''' AS CHAR(1)) ' || + ' AS IBMSNAP_OPERATION, t.* FROM ' || tableschema || '.' || tablename || ' as t ) WITH NO DATA ORGANIZE BY ROW '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_COMMITSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_INTENTSEQ SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ALTER COLUMN IBMSNAP_OPERATION SET NOT NULL'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'CREATE UNIQUE INDEX ASNCDC.IXCDC_' || + tableschema || '_' || tablename || + ' ON ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' ( IBMSNAP_COMMITSEQ ASC, IBMSNAP_INTENTSEQ ASC ) PCTFREE 0 MINPCTUSED 0'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' || + tableschema || '_' || tablename || + ' VOLATILE CARDINALITY'; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_REGISTER (SOURCE_OWNER, SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, GLOBAL_RECORD, SOURCE_STRUCTURE, SOURCE_CONDENSED, ' || + 'SOURCE_COMPLETE, CD_OWNER, CD_TABLE, PHYS_CHANGE_OWNER, ' || + 'PHYS_CHANGE_TABLE, CD_OLD_SYNCHPOINT, CD_NEW_SYNCHPOINT, ' || + 'DISABLE_REFRESH, CCD_OWNER, CCD_TABLE, CCD_OLD_SYNCHPOINT, ' || + 'SYNCHPOINT, SYNCHTIME, CCD_CONDENSED, CCD_COMPLETE, ARCH_LEVEL, ' || + 'DESCRIPTION, BEFORE_IMG_PREFIX, CONFLICT_LEVEL, ' || + 'CHG_UPD_TO_DEL_INS, CHGONLY, RECAPTURE, OPTION_FLAGS, ' || + 'STOP_ON_ERROR, STATE, STATE_INFO ) VALUES( ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''N'', ' || + '1, ' || + '''Y'', ' || + '''Y'', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + 'null, ' || + 'null, ' || + '0, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + 'null, ' || + '''0801'', ' || + 'null, ' || + 'null, ' || + '''0'', ' || + '''Y'', ' || + '''N'', ' || + '''Y'', ' || + '''NNNN'', ' || + '''Y'', ' || + '''A'',' || + 'null ) '; +EXECUTE IMMEDIATE stmtSQL; + +SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || + 'TARGET_SERVER, ' || + 'TARGET_OWNER, ' || + 'TARGET_TABLE, ' || + 'SYNCHTIME, ' || + 'SYNCHPOINT, ' || + 'SOURCE_OWNER, ' || + 'SOURCE_TABLE, ' || + 'SOURCE_VIEW_QUAL, ' || + 'APPLY_QUAL, ' || + 'SET_NAME, ' || + 'CNTL_SERVER , ' || + 'TARGET_STRUCTURE , ' || + 'CNTL_ALIAS , ' || + 'PHYS_CHANGE_OWNER , ' || + 'PHYS_CHANGE_TABLE , ' || + 'MAP_ID ' || + ') VALUES ( ' || + '''KAFKA'', ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + 'NULL, ' || + 'NULL, ' || + '''' || tableschema || ''', ' || + '''' || tablename || ''', ' || + '0, ' || + '''KAFKAQUAL'', ' || + '''SET001'', ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '8, ' || + ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || + '''ASNCDC'', ' || + '''CDC_' || tableschema || '_' || tablename || ''', ' || + ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || + ' )'; +EXECUTE IMMEDIATE stmtSQL; + +END P1@ +--#SET TERMINATOR ; diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/asncdctables.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdctables.sql new file mode 100644 index 000000000..5efb795bd --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/asncdctables.sql @@ -0,0 +1,492 @@ +-- Copyright 2022 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- 1021 db2 LEVEL Version 10.2.0 --> 11.5.0 1150 + +CREATE TABLE ASNCDC.IBMQREP_COLVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +POSITION SMALLINT NOT NULL, +NAME VARCHAR(128) NOT NULL, +TYPE SMALLINT NOT NULL, +LENGTH INTEGER NOT NULL, +NULLS CHAR( 1) NOT NULL, +DEFAULT VARCHAR(1536), +CODEPAGE INTEGER, +SCALE INTEGER, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_COLVERSIOX +ON ASNCDC.IBMQREP_COLVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +POSITION ASC); + +CREATE INDEX ASNCDC.IX2COLVERSION +ON ASNCDC.IBMQREP_COLVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE TABLE ASNCDC.IBMQREP_TABVERSION( +LSN VARCHAR( 16) FOR BIT DATA NOT NULL, +TABLEID1 SMALLINT NOT NULL, +TABLEID2 SMALLINT NOT NULL, +VERSION INTEGER NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_NAME VARCHAR(128) NOT NULL, +VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMQREP_TABVERSIOX +ON ASNCDC.IBMQREP_TABVERSION( +LSN ASC, +TABLEID1 ASC, +TABLEID2 ASC, +VERSION ASC); + +CREATE INDEX ASNCDC.IX2TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +TABLEID1 ASC, +TABLEID2 ASC); + +CREATE INDEX ASNCDC.IX3TABVERSION +ON ASNCDC.IBMQREP_TABVERSION( +SOURCE_OWNER ASC, +SOURCE_NAME ASC); + +CREATE TABLE ASNCDC.IBMSNAP_APPLEVEL( +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021') +ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_APPLEVEL(ARCH_LEVEL) VALUES ( +'1021'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME TIMESTAMP NOT NULL, +RESTART_TIME TIMESTAMP NOT NULL, +CURRENT_MEMORY INT NOT NULL, +CD_ROWS_INSERTED INT NOT NULL, +RECAP_ROWS_SKIPPED INT NOT NULL, +TRIGR_ROWS_SKIPPED INT NOT NULL, +CHG_ROWS_SKIPPED INT NOT NULL, +TRANS_PROCESSED INT NOT NULL, +TRANS_SPILLED INT NOT NULL, +MAX_TRANS_SIZE INT NOT NULL, +LOCKING_RETRIES INT NOT NULL, +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +LOGREADLIMIT INT NOT NULL, +CAPTURE_IDLE INT NOT NULL, +SYNCHTIME TIMESTAMP NOT NULL, +CURRENT_LOG_TIME TIMESTAMP NOT NULL WITH DEFAULT , +LAST_EOL_TIME TIMESTAMP, +RESTART_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +CURRENT_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +RESTART_MAXCMTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT , +LOGREAD_API_TIME INT, +NUM_LOGREAD_CALLS INT, +NUM_END_OF_LOGS INT, +LOGRDR_SLEEPTIME INT, +NUM_LOGREAD_F_CALLS INT, +TRANS_QUEUED INT, +NUM_WARNTXS INT, +NUM_WARNLOGAPI INT) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPMONX +ON ASNCDC.IBMSNAP_CAPMON( +MONITOR_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPMON VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT INT, +LAG_LIMIT INT, +COMMIT_INTERVAL INT, +PRUNE_INTERVAL INT, +TRACE_LIMIT INT, +MONITOR_LIMIT INT, +MONITOR_INTERVAL INT, +MEMORY_LIMIT SMALLINT, +REMOTE_SRC_SERVER CHAR( 18), +AUTOPRUNE CHAR( 1), +TERM CHAR( 1), +AUTOSTOP CHAR( 1), +LOGREUSE CHAR( 1), +LOGSTDOUT CHAR( 1), +SLEEP_INTERVAL SMALLINT, +CAPTURE_PATH VARCHAR(1040), +STARTMODE VARCHAR( 10), +LOGRDBUFSZ INT NOT NULL WITH DEFAULT 256, +ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021', +COMPATIBILITY CHAR( 4) NOT NULL WITH DEFAULT '1021') + ORGANIZE BY ROW; + +INSERT INTO ASNCDC.IBMSNAP_CAPPARMS( +RETENTION_LIMIT, +LAG_LIMIT, +COMMIT_INTERVAL, +PRUNE_INTERVAL, +TRACE_LIMIT, +MONITOR_LIMIT, +MONITOR_INTERVAL, +MEMORY_LIMIT, +SLEEP_INTERVAL, +AUTOPRUNE, +TERM, +AUTOSTOP, +LOGREUSE, +LOGSTDOUT, +CAPTURE_PATH, +STARTMODE, +COMPATIBILITY) +VALUES ( +10080, +10080, +30, +300, +10080, +10080, +300, +32, +5, +'Y', +'Y', +'N', +'N', +'N', +NULL, +'WARMSI', +'1021' +); + +CREATE TABLE ASNCDC.IBMSNAP_CAPSCHEMAS ( + CAP_SCHEMA_NAME VARCHAR(128 OCTETS) NOT NULL + ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPSCHEMASX + ON ASNCDC.IBMSNAP_CAPSCHEMAS + (CAP_SCHEMA_NAME ASC); + +INSERT INTO ASNCDC.IBMSNAP_CAPSCHEMAS(CAP_SCHEMA_NAME) VALUES ( +'ASNCDC'); + +CREATE TABLE ASNCDC.IBMSNAP_CAPTRACE( +OPERATION CHAR( 8) NOT NULL, +TRACE_TIME TIMESTAMP NOT NULL, +DESCRIPTION VARCHAR(1024) NOT NULL) + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_CAPTRACEX +ON ASNCDC.IBMSNAP_CAPTRACE( +TRACE_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_CAPTRACE VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNCNTL( +TARGET_SERVER CHAR(18) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +CNTL_SERVER CHAR( 18) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +CNTL_ALIAS CHAR( 8), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +MAP_ID VARCHAR(10) NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX +ON ASNCDC.IBMSNAP_PRUNCNTL( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC, +TARGET_TABLE ASC, +TARGET_OWNER ASC); + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX1 +ON ASNCDC.IBMSNAP_PRUNCNTL( +MAP_ID ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX2 +ON ASNCDC.IBMSNAP_PRUNCNTL( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX3 +ON ASNCDC.IBMSNAP_PRUNCNTL( +APPLY_QUAL ASC, +SET_NAME ASC, +TARGET_SERVER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNCNTL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_LOCK( +DUMMY CHAR( 1)) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER CHAR( 18) NOT NULL, +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SYNCHTIME TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNE_SETX +ON ASNCDC.IBMSNAP_PRUNE_SET( +TARGET_SERVER ASC, +APPLY_QUAL ASC, +SET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_PRUNE_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +GLOBAL_RECORD CHAR( 1) NOT NULL, +SOURCE_STRUCTURE SMALLINT NOT NULL, +SOURCE_CONDENSED CHAR( 1) NOT NULL, +SOURCE_COMPLETE CHAR( 1) NOT NULL, +CD_OWNER VARCHAR(128), +CD_TABLE VARCHAR(128), +PHYS_CHANGE_OWNER VARCHAR(128), +PHYS_CHANGE_TABLE VARCHAR(128), +CD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +CD_NEW_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +DISABLE_REFRESH SMALLINT NOT NULL, +CCD_OWNER VARCHAR(128), +CCD_TABLE VARCHAR(128), +CCD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CCD_CONDENSED CHAR( 1), +CCD_COMPLETE CHAR( 1), +ARCH_LEVEL CHAR( 4) NOT NULL, +DESCRIPTION CHAR(254), +BEFORE_IMG_PREFIX VARCHAR( 4), +CONFLICT_LEVEL CHAR( 1), +CHG_UPD_TO_DEL_INS CHAR( 1), +CHGONLY CHAR( 1), +RECAPTURE CHAR( 1), +OPTION_FLAGS CHAR( 4) NOT NULL, +STOP_ON_ERROR CHAR( 1) WITH DEFAULT 'Y', +STATE CHAR( 1) WITH DEFAULT 'I', +STATE_INFO CHAR( 8)) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_REGISTERX +ON ASNCDC.IBMSNAP_REGISTER( +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX1 +ON ASNCDC.IBMSNAP_REGISTER( +PHYS_CHANGE_OWNER ASC, +PHYS_CHANGE_TABLE ASC); + +CREATE INDEX ASNCDC.IBMSNAP_REGISTERX2 +ON ASNCDC.IBMSNAP_REGISTER( +GLOBAL_RECORD ASC); + +ALTER TABLE ASNCDC.IBMSNAP_REGISTER VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_RESTART( +MAX_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +MAX_COMMIT_TIME TIMESTAMP NOT NULL, +MIN_INFLIGHTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +CURR_COMMIT_TIME TIMESTAMP NOT NULL, +CAPTURE_FIRST_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL) + ORGANIZE BY ROW; + +CREATE TABLE ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME TIMESTAMP NOT NULL WITH DEFAULT , +SIGNAL_TYPE VARCHAR( 30) NOT NULL, +SIGNAL_SUBTYPE VARCHAR( 30), +SIGNAL_INPUT_IN VARCHAR(500), +SIGNAL_STATE CHAR( 1) NOT NULL, +SIGNAL_LSN VARCHAR( 16) FOR BIT DATA) +DATA CAPTURE CHANGES + ORGANIZE BY ROW; + +CREATE INDEX ASNCDC.IBMSNAP_SIGNALX +ON ASNCDC.IBMSNAP_SIGNAL( +SIGNAL_TIME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SIGNAL VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +COL_TYPE CHAR( 1) NOT NULL, +TARGET_NAME VARCHAR(128) NOT NULL, +IS_KEY CHAR( 1) NOT NULL, +COLNO SMALLINT NOT NULL, +EXPRESSION VARCHAR(1024) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_COLSX +ON ASNCDC.IBMSNAP_SUBS_COLS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC, +TARGET_NAME ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_COLS VOLATILE CARDINALITY; + +--CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_EVENTX +--ON ASNCDC.IBMSNAP_SUBS_EVENT( +--EVENT_NAME ASC, +--EVENT_TIME ASC); + + +--ALTER TABLE ASNCDC.IBMSNAP_SUBS_EVENT VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +SOURCE_OWNER VARCHAR(128) NOT NULL, +SOURCE_TABLE VARCHAR(128) NOT NULL, +SOURCE_VIEW_QUAL SMALLINT NOT NULL, +TARGET_OWNER VARCHAR(128) NOT NULL, +TARGET_TABLE VARCHAR(128) NOT NULL, +TARGET_CONDENSED CHAR( 1) NOT NULL, +TARGET_COMPLETE CHAR( 1) NOT NULL, +TARGET_STRUCTURE SMALLINT NOT NULL, +PREDICATES VARCHAR(1024), +MEMBER_STATE CHAR( 1), +TARGET_KEY_CHG CHAR( 1) NOT NULL, +UOW_CD_PREDICATES VARCHAR(1024), +JOIN_UOW_CD CHAR( 1), +LOADX_TYPE SMALLINT, +LOADX_SRC_N_OWNER VARCHAR( 128), +LOADX_SRC_N_TABLE VARCHAR(128)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_MEMBRX +ON ASNCDC.IBMSNAP_SUBS_MEMBR( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +SOURCE_OWNER ASC, +SOURCE_TABLE ASC, +SOURCE_VIEW_QUAL ASC, +TARGET_OWNER ASC, +TARGET_TABLE ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_MEMBR VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +SET_TYPE CHAR( 1) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +ACTIVATE SMALLINT NOT NULL, +SOURCE_SERVER CHAR( 18) NOT NULL, +SOURCE_ALIAS CHAR( 8), +TARGET_SERVER CHAR( 18) NOT NULL, +TARGET_ALIAS CHAR( 8), +STATUS SMALLINT NOT NULL, +LASTRUN TIMESTAMP NOT NULL, +REFRESH_TYPE CHAR( 1) NOT NULL, +SLEEP_MINUTES INT, +EVENT_NAME CHAR( 18), +LASTSUCCESS TIMESTAMP, +SYNCHPOINT VARCHAR( 16) FOR BIT DATA, +SYNCHTIME TIMESTAMP, +CAPTURE_SCHEMA VARCHAR(128) NOT NULL, +TGT_CAPTURE_SCHEMA VARCHAR(128), +FEDERATED_SRC_SRVR VARCHAR( 18), +FEDERATED_TGT_SRVR VARCHAR( 18), +JRN_LIB CHAR( 10), +JRN_NAME CHAR( 10), +OPTION_FLAGS CHAR( 4) NOT NULL, +COMMIT_COUNT SMALLINT, +MAX_SYNCH_MINUTES SMALLINT, +AUX_STMTS SMALLINT NOT NULL, +ARCH_LEVEL CHAR( 4) NOT NULL) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_SETX +ON ASNCDC.IBMSNAP_SUBS_SET( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_SET VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL CHAR( 18) NOT NULL, +SET_NAME CHAR( 18) NOT NULL, +WHOS_ON_FIRST CHAR( 1) NOT NULL, +BEFORE_OR_AFTER CHAR( 1) NOT NULL, +STMT_NUMBER SMALLINT NOT NULL, +EI_OR_CALL CHAR( 1) NOT NULL, +SQL_STMT VARCHAR(1024), +ACCEPT_SQLSTATES VARCHAR( 50)) +ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_STMTSX +ON ASNCDC.IBMSNAP_SUBS_STMTS( +APPLY_QUAL ASC, +SET_NAME ASC, +WHOS_ON_FIRST ASC, +BEFORE_OR_AFTER ASC, +STMT_NUMBER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_SUBS_STMTS VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_UOW( +IBMSNAP_UOWID CHAR( 10) FOR BIT DATA NOT NULL, +IBMSNAP_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL, +IBMSNAP_LOGMARKER TIMESTAMP NOT NULL, +IBMSNAP_AUTHTKN VARCHAR(30) NOT NULL, +IBMSNAP_AUTHID VARCHAR(128) NOT NULL, +IBMSNAP_REJ_CODE CHAR( 1) NOT NULL WITH DEFAULT , +IBMSNAP_APPLY_QUAL CHAR( 18) NOT NULL WITH DEFAULT ) + ORGANIZE BY ROW; + +CREATE UNIQUE INDEX ASNCDC.IBMSNAP_UOWX +ON ASNCDC.IBMSNAP_UOW( +IBMSNAP_COMMITSEQ ASC, +IBMSNAP_LOGMARKER ASC); + +ALTER TABLE ASNCDC.IBMSNAP_UOW VOLATILE CARDINALITY; + +CREATE TABLE ASNCDC.IBMSNAP_CAPENQ ( + LOCK_NAME CHAR(9 OCTETS) + ) + ORGANIZE BY ROW + DATA CAPTURE NONE + COMPRESS NO; diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/cdcsetup.sh b/flink-connector-db2-cdc/src/test/resources/db2_server/cdcsetup.sh new file mode 100644 index 000000000..02e00fafc --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/cdcsetup.sh @@ -0,0 +1,33 @@ +#/bin/bash +################################################################################ +# Copyright 2022 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +if [ ! -f /asncdctools/src/asncdc.nlk ]; then +rc=1 +echo "waiting for db2inst1 exists ." +while [ "$rc" -ne 0 ] +do + sleep 5 + id db2inst1 + rc=$? + echo '.' +done + +su -c "/asncdctools/src/dbsetup.sh $DBNAME" - db2inst1 +fi +touch /asncdctools/src/asncdc.nlk + +echo "The asncdc program enable finished" diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/column_type_test.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/column_type_test.sql new file mode 100644 index 000000000..00e3a29a2 --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/column_type_test.sql @@ -0,0 +1,42 @@ +-- Copyright 2022 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: column_type_test +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE TABLE DB2INST1.FULL_TYPES ( + ID INTEGER NOT NULL, + SMALL_C SMALLINT, + INT_C INTEGER, + BIG_C BIGINT, + REAL_C REAL, + DOUBLE_C DOUBLE, + NUMERIC_C NUMERIC(10, 5), + DECIMAL_C DECIMAL(10, 1), + VARCHAR_C VARCHAR(200), + CHAR_C CHAR, + CHARACTER_C CHAR(3), + TIMESTAMP_C TIMESTAMP, + DATE_C DATE, + TIME_C TIME, + DEFAULT_NUMERIC_C NUMERIC, + TIMESTAMP_PRECISION_C TIMESTAMP(9), + PRIMARY KEY (ID) +); + + +INSERT INTO DB2INST1.FULL_TYPES VALUES ( + 1, 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, + 'Hello World', 'a', 'abc', '2020-07-17 18:00:22.123', '2020-07-17', '18:00:22', 500, + '2020-07-17 18:00:22.123456789'); diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/dbsetup.sh b/flink-connector-db2-cdc/src/test/resources/db2_server/dbsetup.sh new file mode 100644 index 000000000..5cd0c9222 --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/dbsetup.sh @@ -0,0 +1,70 @@ +#/bin/bash +################################################################################ +# Copyright 2022 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +echo "Compile ASN tool ..." +cd /asncdctools/src +/opt/ibm/db2/V11.5/samples/c/bldrtn asncdc + +DBNAME=$1 +DB2DIR=/opt/ibm/db2/V11.5 +rc=1 +echo "Waiting for DB2 start ( $DBNAME ) ." +while [ "$rc" -ne 0 ] +do + sleep 5 + db2 connect to $DBNAME + rc=$? + echo '.' +done + +# enable metacatalog read via JDBC +cd $HOME/sqllib/bnd +db2 bind db2schema.bnd blocking all grant public sqlerror continue + +# do a backup and restart the db +db2 backup db $DBNAME to /dev/null +db2 restart db $DBNAME + +db2 connect to $DBNAME + +cp /asncdctools/src/asncdc /database/config/db2inst1/sqllib/function +chmod 777 /database/config/db2inst1/sqllib/function + +# add UDF / start stop asncap +db2 -tvmf /asncdctools/src/asncdc_UDF.sql + +# create asntables +db2 -tvmf /asncdctools/src/asncdctables.sql + +# add UDF / add remove asntables + +db2 -tvmf /asncdctools/src/asncdcaddremove.sql + + + + +# create sample table and data +db2 -tvmf /asncdctools/src/inventory.sql +db2 -tvmf /asncdctools/src/column_type_test.sql +db2 -tvmf /asncdctools/src/startup-agent.sql +sleep 10 +db2 -tvmf /asncdctools/src/startup-cdc-demo.sql + + + + +echo "db2 setup done" diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/inventory.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/inventory.sql new file mode 100644 index 000000000..2adc6e914 --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/inventory.sql @@ -0,0 +1,70 @@ +-- Copyright 2022 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Create and populate our test products table using a single insert with many rows +CREATE TABLE DB2INST1.PRODUCTS ( + ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY + (START WITH 101, INCREMENT BY 1) PRIMARY KEY, + NAME VARCHAR(255) NOT NULL, + DESCRIPTION VARCHAR(512), + WEIGHT FLOAT +); + +CREATE TABLE DB2INST1.PRODUCTS1 ( + ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY + (START WITH 101, INCREMENT BY 1) PRIMARY KEY, + NAME VARCHAR(255) NOT NULL, + DESCRIPTION VARCHAR(512), + WEIGHT FLOAT +); + +CREATE TABLE DB2INST1.PRODUCTS2 ( + ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY + (START WITH 101, INCREMENT BY 1) PRIMARY KEY, + NAME VARCHAR(255) NOT NULL, + DESCRIPTION VARCHAR(512), + WEIGHT FLOAT +); + +INSERT INTO DB2INST1.PRODUCTS(NAME,DESCRIPTION,WEIGHT) +VALUES ('scooter','Small 2-wheel scooter',3.14), + ('car battery','12V car battery',8.1), + ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8), + ('hammer','12oz carpenter''s hammer',0.75), + ('hammer','14oz carpenter''s hammer',0.875), + ('hammer','16oz carpenter''s hammer',1.0), + ('rocks','box of assorted rocks',5.3), + ('jacket','water resistent black wind breaker',0.1), + ('spare tire','24 inch spare tire',22.2); + +INSERT INTO DB2INST1.PRODUCTS1(NAME,DESCRIPTION,WEIGHT) +VALUES ('scooter','Small 2-wheel scooter',3.14), + ('car battery','12V car battery',8.1), + ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8), + ('hammer','12oz carpenter''s hammer',0.75), + ('hammer','14oz carpenter''s hammer',0.875), + ('hammer','16oz carpenter''s hammer',1.0), + ('rocks','box of assorted rocks',5.3), + ('jacket','water resistent black wind breaker',0.1), + ('spare tire','24 inch spare tire',22.2); + +INSERT INTO DB2INST1.PRODUCTS2(NAME,DESCRIPTION,WEIGHT) +VALUES ('scooter','Small 2-wheel scooter',3.14), + ('car battery','12V car battery',8.1), + ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8), + ('hammer','12oz carpenter''s hammer',0.75), + ('hammer','14oz carpenter''s hammer',0.875), + ('hammer','16oz carpenter''s hammer',1.0), + ('rocks','box of assorted rocks',5.3), + ('jacket','water resistent black wind breaker',0.1), + ('spare tire','24 inch spare tire',22.2); diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/startup-agent.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/startup-agent.sql new file mode 100644 index 000000000..230ca11ab --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/startup-agent.sql @@ -0,0 +1,14 @@ +-- Copyright 2022 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +VALUES ASNCDC.ASNCDCSERVICES('start','asncdc'); diff --git a/flink-connector-db2-cdc/src/test/resources/db2_server/startup-cdc-demo.sql b/flink-connector-db2-cdc/src/test/resources/db2_server/startup-cdc-demo.sql new file mode 100644 index 000000000..3d502c179 --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/db2_server/startup-cdc-demo.sql @@ -0,0 +1,21 @@ +-- Copyright 2022 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +VALUES ASNCDC.ASNCDCSERVICES('status','asncdc'); + +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS'); +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS1'); +CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS2'); +CALL ASNCDC.ADDTABLE('DB2INST1', 'FULL_TYPES'); + +VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc'); diff --git a/flink-connector-db2-cdc/src/test/resources/log4j2-test.properties b/flink-connector-db2-cdc/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..9df04b09f --- /dev/null +++ b/flink-connector-db2-cdc/src/test/resources/log4j2-test.properties @@ -0,0 +1,26 @@ +################################################################################ +# Copyright 2022 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_OUT +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/pom.xml b/pom.xml index 77d46bf48..a7af197af 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,7 @@ under the License. flink-connector-oceanbase-cdc flink-connector-sqlserver-cdc flink-connector-tidb-cdc + flink-connector-db2-cdc flink-sql-connector-mysql-cdc flink-sql-connector-postgres-cdc flink-sql-connector-mongodb-cdc @@ -267,6 +268,7 @@ under the License. **/*.txt flink-connector-mysql-cdc/src/test/resources/file/*.json + flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile