[db2] Introduce Db2 cdc connector (#450)

pull/1681/head
Hang Ruan 2 years ago committed by Leonard Xu
parent a6fa1e694e
commit 5d005809e9

@ -0,0 +1,134 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2022 Ververica Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.ververica</groupId>
<version>2.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-connector-db2-cdc</artifactId>
<name>flink-connector-db2-cdc</name>
<packaging>jar</packaging>
<dependencies>
<!-- Debezium dependencies -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>kafka-log4j-appender</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-db2</artifactId>
<version>${debezium.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>db2</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<!-- db2 driver dependency -->
<dependency>
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
<version>db2jcc4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-test-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<scope>test</scope>
<type>test-jar</type>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,134 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2;
import com.ververica.cdc.connectors.db2.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.Validator;
import io.debezium.connector.db2.Db2Connector;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Source for DB2 CDC connector. */
public class Db2Source {
private static final String DB2_DATABASE_SERVER_NAME = "db2_cdc_source";
public static <T> Builder<T> builder() {
return new Builder<>();
}
/**
* Builder for Db2Source.
*
* @param <T> Output type of the source
*/
public static class Builder<T> {
private String hostname;
private int port = 50000;
private String username;
private String password;
private String database;
// Should be in "schema.table" format
private String[] tableList;
private Properties dbzProperties;
private StartupOptions startupOptions = StartupOptions.initial();
private DebeziumDeserializationSchema<T> deserializer;
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", Db2Connector.class.getCanonicalName());
props.setProperty("database.hostname", checkNotNull(hostname));
props.setProperty("database.port", String.valueOf(port));
props.setProperty("database.user", checkNotNull(username));
props.setProperty("database.password", checkNotNull(password));
props.setProperty("database.dbname", checkNotNull(database));
props.setProperty("database.server.name", DB2_DATABASE_SERVER_NAME); // Hard-coded here
props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
if (tableList != null) {
props.setProperty("table.whitelist", String.join(",", tableList));
}
if (dbzProperties != null) {
props.putAll(dbzProperties);
}
switch (startupOptions.startupMode) {
case INITIAL:
props.setProperty("snapshot.mode", "initial");
break;
case LATEST_OFFSET:
props.setProperty("snapshot.mode", "schema_only");
break;
default:
throw new UnsupportedOperationException();
}
return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
}
public Builder<T> hostname(String hostname) {
this.hostname = hostname;
return this;
}
public Builder<T> port(int port) {
this.port = port;
return this;
}
public Builder<T> username(String username) {
this.username = username;
return this;
}
public Builder<T> password(String password) {
this.password = password;
return this;
}
public Builder<T> database(String database) {
this.database = database;
return this;
}
public Builder<T> tableList(String... tableList) {
this.tableList = tableList;
return this;
}
public Builder<T> debeziumProperties(Properties debeziumProperties) {
this.dbzProperties = debeziumProperties;
return this;
}
public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
this.deserializer = deserializer;
return this;
}
/** Specifies the startup options. */
public Builder<T> startupOptions(StartupOptions startupOptions) {
this.startupOptions = startupOptions;
return this;
}
}
}

@ -0,0 +1,122 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2.table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.debezium.table.MetadataConverter;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/** Defines the supported metadata columns for {@link Db2TableSource}. */
public enum Db2ReadableMetaData {
/** Name of the table that contain the row. */
TABLE_NAME(
"table_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
}
}),
/** Name of the schema that contain the row. */
SCHEMA_NAME(
"schema_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.SCHEMA_NAME_KEY));
}
}),
/** Name of the database that contain the row. */
DATABASE_NAME(
"database_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
}
}),
/**
* It indicates the time that the change was made in the database. If the record is read from
* snapshot of the table instead of the change stream, the value is always 0.
*/
OP_TS(
"op_ts",
DataTypes.TIMESTAMP_LTZ(3).notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
});
private final String key;
private final DataType dataType;
private final MetadataConverter converter;
Db2ReadableMetaData(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}
public String getKey() {
return key;
}
public DataType getDataType() {
return dataType;
}
public MetadataConverter getConverter() {
return converter;
}
}

@ -0,0 +1,225 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.db2.Db2Source;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** TableSource for DB2 CDC connector. */
public class Db2TableSource implements ScanTableSource, SupportsReadingMetadata {
private final ResolvedSchema physicalSchema;
/** Data type that describes the final output of the source. */
protected DataType producedDataType;
private final int port;
private final String hostname;
private final String database;
private final String schemaName;
private final String tableName;
private final String username;
private final String password;
private final ZoneId serverTimeZone;
private final StartupOptions startupOptions;
private final Properties dbzProperties;
/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
public Db2TableSource(
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
String schemaName,
String tableName,
String username,
String password,
ZoneId serverTimeZone,
Properties dbzProperties,
StartupOptions startupOptions) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = hostname;
this.database = database;
this.schemaName = schemaName;
this.tableName = tableName;
this.username = username;
this.password = password;
this.serverTimeZone = serverTimeZone;
this.dbzProperties = dbzProperties;
this.startupOptions = startupOptions;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.UPDATE_BEFORE)
.build();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
final TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setMetadataConverters(metadataConverters)
.setResultTypeInfo(typeInfo)
.setServerTimeZone(serverTimeZone)
.build();
Db2Source.Builder<RowData> builder =
Db2Source.<RowData>builder()
.hostname(hostname)
.port(port)
.database(database)
.tableList(schemaName + "." + tableName)
.username(username)
.password(password)
.debeziumProperties(dbzProperties)
.deserializer(deserializer)
.startupOptions(startupOptions);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
private MetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new MetadataConverter[0];
}
return metadataKeys.stream()
.map(
key ->
Stream.of(Db2ReadableMetaData.values())
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(Db2ReadableMetaData::getConverter)
.toArray(MetadataConverter[]::new);
}
@Override
public DynamicTableSource copy() {
Db2TableSource source =
new Db2TableSource(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
startupOptions);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Db2TableSource)) {
return false;
}
Db2TableSource that = (Db2TableSource) o;
return port == that.port
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
&& Objects.equals(schemaName, that.schemaName)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(serverTimeZone, that.serverTimeZone)
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(metadataKeys, that.metadataKeys);
}
@Override
public int hashCode() {
return Objects.hash(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
metadataKeys);
}
@Override
public String asSummaryString() {
return "DB2-CDC";
}
@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(Db2ReadableMetaData.values())
.collect(
Collectors.toMap(
Db2ReadableMetaData::getKey, Db2ReadableMetaData::getDataType));
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
}

@ -0,0 +1,179 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;
import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
/** Table source factory for DB2 CDC connector. */
public class Db2TableSourceFactory implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "db2-cdc";
private static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
.withDescription("IP address or hostname of the DB2 database server.");
private static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(50000)
.withDescription("Integer port number of the DB2 database server.");
private static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription(
"Username of the DB2 database to use when connecting to the DB2 database server.");
private static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("Password to use when connecting to the DB2 database server.");
private static final ConfigOption<String> DATABASE_NAME =
ConfigOptions.key("database-name")
.stringType()
.noDefaultValue()
.withDescription("Database name of the DB2 server to monitor.");
private static final ConfigOption<String> SCHEMA_NAME =
ConfigOptions.key("schema-name")
.stringType()
.noDefaultValue()
.withDescription("Schema name of the DB2 database to monitor.");
private static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription(
"Table name of the DB2 database to monitor. This name should not include schema name.");
private static final ConfigOption<String> SERVER_TIME_ZONE =
ConfigOptions.key("server-time-zone")
.stringType()
.defaultValue("UTC")
.withDescription("The session time zone in database server.");
public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.stringType()
.defaultValue("initial")
.withDescription(
"Optional startup mode for Db2 CDC consumer, the valid modes are "
+ "\"initial\", \"latest-offset\"");
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
String schemaName = config.get(SCHEMA_NAME);
String tableName = config.get(TABLE_NAME);
int port = config.get(PORT);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
StartupOptions startupOptions = getStartupOptions(config);
return new Db2TableSource(
physicalSchema,
port,
hostname,
databaseName,
schemaName,
tableName,
username,
password,
serverTimeZone,
getDebeziumProperties(context.getCatalogTable().getOptions()),
startupOptions);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(DATABASE_NAME);
options.add(SCHEMA_NAME);
options.add(TABLE_NAME);
options.add(USERNAME);
options.add(PASSWORD);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
options.add(SERVER_TIME_ZONE);
options.add(SCAN_STARTUP_MODE);
return options;
}
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static StartupOptions getStartupOptions(ReadableConfig config) {
String modeString = config.get(SCAN_STARTUP_MODE);
switch (modeString.toLowerCase()) {
case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
default:
throw new ValidationException(
String.format(
"Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_LATEST,
modeString));
}
}
}

@ -0,0 +1,27 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2.table;
/**
* Startup modes for the Db2 CDC Consumer.
*
* @see StartupOptions
*/
public enum StartupMode {
INITIAL,
LATEST_OFFSET
}

@ -0,0 +1,70 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2.table;
import java.util.Objects;
/** Debezium startup options. */
public final class StartupOptions {
public final StartupMode startupMode;
/**
* Performs an initial snapshot on the monitored database tables upon first startup, and
* continue to read change events from the databases redo logs.
*/
public static StartupOptions initial() {
return new StartupOptions(StartupMode.INITIAL);
}
/**
* Never to perform snapshot on the monitored database tables upon first startup, just read from
* the end of the change events which means only have the changes since the connector was
* started.
*/
public static StartupOptions latest() {
return new StartupOptions(StartupMode.LATEST_OFFSET);
}
private StartupOptions(StartupMode startupMode) {
this.startupMode = startupMode;
switch (startupMode) {
case INITIAL:
case LATEST_OFFSET:
break;
default:
throw new UnsupportedOperationException(startupMode + " mode is not supported.");
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StartupOptions that = (StartupOptions) o;
return startupMode == that.startupMode;
}
@Override
public int hashCode() {
return Objects.hash(startupMode);
}
}

@ -0,0 +1,15 @@
# Copyright 2022 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.ververica.cdc.connectors.db2.table.Db2TableSourceFactory

@ -0,0 +1,584 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.jayway.jsonpath.JsonPath;
import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertDelete;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertInsert;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertRead;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertUpdate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.testcontainers.containers.Db2Container.DB2_PORT;
/** Test for {@link Db2Source} which also heavily tests {@link DebeziumSourceFunction}. */
public class Db2SourceTest extends Db2TestBase {
@Test
public void testConsumingAllEvents() throws Exception {
DebeziumSourceFunction<SourceRecord> source = createDb2Source("DB2INST1.PRODUCTS1");
TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
setupSource(source);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
// start the source
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
List<SourceRecord> records = drain(sourceContext, 9);
assertEquals(9, records.size());
for (int i = 0; i < records.size(); i++) {
assertRead(records.get(i), "ID", 101 + i);
}
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'robot','Toy robot',1.304)"); // 110
records = drain(sourceContext, 1);
assertInsert(records.get(0), "ID", 110);
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS1 VALUES (1001,'roy','old robot',1234.56)"); // 1001
records = drain(sourceContext, 1);
assertInsert(records.get(0), "ID", 1001);
// ---------------------------------------------------------------------------------------------------------------
// Changing the primary key of a row should result in 2 events: INSERT, DELETE
// (TOMBSTONE is dropped)
// ---------------------------------------------------------------------------------------------------------------
statement.execute(
"UPDATE DB2INST1.PRODUCTS1 SET ID=2001, DESCRIPTION='really old robot' WHERE ID=1001");
records = drain(sourceContext, 2);
assertDelete(records.get(0), "ID", 1001);
assertInsert(records.get(1), "ID", 2001);
// ---------------------------------------------------------------------------------------------------------------
// Simple UPDATE (with no schema changes)
// ---------------------------------------------------------------------------------------------------------------
statement.execute("UPDATE DB2INST1.PRODUCTS1 SET WEIGHT=1345.67 WHERE ID=2001");
records = drain(sourceContext, 1);
assertUpdate(records.get(0), "ID", 2001);
// ---------------------------------------------------------------------------------------------------------------
// Change our schema with a fully-qualified name; we should still see this event
// ---------------------------------------------------------------------------------------------------------------
// Add a column with default to the 'products' table and explicitly update one record
// ...
statement.execute(
"ALTER TABLE DB2INST1.PRODUCTS1 ADD COLUMN VOLUME FLOAT ADD COLUMN ALIAS VARCHAR(30) NULL");
statement.execute("UPDATE DB2INST1.PRODUCTS1 SET VOLUME=13.5 WHERE ID=2001");
records = drain(sourceContext, 1);
assertUpdate(records.get(0), "ID", 2001);
// cleanup
source.cancel();
source.close();
runThread.sync();
}
}
@Test
public void testCheckpointAndRestore() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
String prevLsn = "";
{
// ---------------------------------------------------------------------------
// Step-1: start the source from empty state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source =
createDb2Source("DB2INST1.PRODUCTS2");
// we use blocking context to block the source to emit before last snapshot record
final BlockingSourceContext<SourceRecord> sourceContext =
new BlockingSourceContext<>(8);
// setup source with empty state
setupSource(source, null, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until consumer is started
int received = drain(sourceContext, 2).size();
assertEquals(2, received);
// we can't perform checkpoint during DB snapshot
assertFalse(
waitForCheckpointLock(
sourceContext.getCheckpointLock(), Duration.ofSeconds(3)));
// unblock the source context to continue the processing
sourceContext.blocker.release();
// wait until the source finishes the database snapshot
List<SourceRecord> records = drain(sourceContext, 9 - received);
assertEquals(9, records.size() + received);
// state is still empty
assertEquals(0, offsetState.list.size());
assertEquals(0, historyState.list.size());
// ---------------------------------------------------------------------------
// Step-2: trigger checkpoint-1 after snapshot finished
// ---------------------------------------------------------------------------
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("db2_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
String lsn = JsonPath.read(state, "$.sourceOffset.commit_lsn");
assertTrue(lsn.compareTo(prevLsn) > 0);
prevLsn = lsn;
source.cancel();
source.close();
runThread.sync();
}
{
// ---------------------------------------------------------------------------
// Step-3: restore the source from state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source2 =
createDb2Source("DB2INST1.PRODUCTS2");
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
setupSource(source2, 1L, offsetState, historyState, true, 0, 1);
final CheckedThread runThread2 =
new CheckedThread() {
@Override
public void go() throws Exception {
source2.run(sourceContext2);
}
};
runThread2.start();
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2));
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS2 VALUES (default,'robot','Toy robot',1.304)"); // 110
List<SourceRecord> records = drain(sourceContext2, 1);
assertEquals(1, records.size());
assertInsert(records.get(0), "ID", 110);
// ---------------------------------------------------------------------------
// Step-4: trigger checkpoint-2 during DML operations
// ---------------------------------------------------------------------------
synchronized (sourceContext2.getCheckpointLock()) {
// trigger checkpoint-1
source2.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
}
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("db2_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
String lsn = JsonPath.read(state, "$.sourceOffset.commit_lsn");
assertTrue(lsn.compareTo(prevLsn) > 0);
// execute 2 more DMLs to have more binlog
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS2 VALUES (1001,'roy','old robot',1234.56)"); // 1001
statement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT=1345.67 WHERE ID=1001");
}
// cancel the source
source2.cancel();
source2.close();
runThread2.sync();
}
{
// ---------------------------------------------------------------------------
// Step-5: restore the source from checkpoint-2
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source3 =
createDb2Source("DB2INST1.PRODUCTS2");
final TestSourceContext<SourceRecord> sourceContext3 = new TestSourceContext<>();
setupSource(source3, 2L, offsetState, historyState, true, 0, 1);
// restart the source
final CheckedThread runThread3 =
new CheckedThread() {
@Override
public void go() throws Exception {
source3.run(sourceContext3);
}
};
runThread3.start();
// consume the unconsumed binlog
List<SourceRecord> records = drain(sourceContext3, 2);
assertInsert(records.get(0), "ID", 1001);
assertUpdate(records.get(1), "ID", 1001);
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext3));
// can continue to receive new events
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("DELETE FROM DB2INST1.PRODUCTS2 WHERE ID=1001");
}
records = drain(sourceContext3, 1);
assertDelete(records.get(0), "ID", 1001);
// ---------------------------------------------------------------------------
// Step-6: trigger checkpoint-2 to make sure we can continue to to further checkpoints
// ---------------------------------------------------------------------------
synchronized (sourceContext3.getCheckpointLock()) {
// checkpoint 3
source3.snapshotState(new StateSnapshotContextSynchronousImpl(233, 233));
}
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("db2_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
String lsn = JsonPath.read(state, "$.sourceOffset.commit_lsn");
assertTrue(lsn.compareTo(prevLsn) > 0);
source3.cancel();
source3.close();
runThread3.sync();
}
}
// ------------------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------------------
private DebeziumSourceFunction<SourceRecord> createDb2Source(String tableName) {
return Db2Source.<SourceRecord>builder()
.hostname(DB2_CONTAINER.getHost())
.port(DB2_CONTAINER.getMappedPort(DB2_PORT))
.database(DB2_CONTAINER.getDatabaseName())
.username(DB2_CONTAINER.getUsername())
.password(DB2_CONTAINER.getPassword())
.tableList(tableName)
.deserializer(new ForwardDeserializeSchema())
.build();
}
private <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount)
throws Exception {
List<T> allRecords = new ArrayList<>();
LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs();
while (allRecords.size() < expectedRecordCount) {
StreamRecord<T> record = queue.poll(100, TimeUnit.SECONDS);
if (record != null) {
allRecords.add(record.getValue());
} else {
throw new RuntimeException(
"Can't receive " + expectedRecordCount + " elements before timeout.");
}
}
return allRecords;
}
private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout)
throws Exception {
final Semaphore semaphore = new Semaphore(0);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(
() -> {
synchronized (checkpointLock) {
semaphore.release();
}
});
boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS);
executor.shutdownNow();
return result;
}
/**
* Wait for a maximum amount of time until the first record is available.
*
* @param timeout the maximum amount of time to wait; must not be negative
* @return {@code true} if records are available, or {@code false} if the timeout occurred and
* no records are available
*/
private boolean waitForAvailableRecords(Duration timeout, TestSourceContext<?> sourceContext)
throws InterruptedException {
long now = System.currentTimeMillis();
long stop = now + timeout.toMillis();
while (System.currentTimeMillis() < stop) {
if (!sourceContext.getCollectedOutputs().isEmpty()) {
break;
}
Thread.sleep(10); // save CPU
}
return !sourceContext.getCollectedOutputs().isEmpty();
}
private static <T> void setupSource(DebeziumSourceFunction<T> source) throws Exception {
setupSource(
source, null, null, null,
true, // enable checkpointing; auto commit should be ignored
0, 1);
}
private static <T, S1, S2> void setupSource(
DebeziumSourceFunction<T> source,
@Nullable Long restoredCheckpointId,
ListState<S1> restoredOffsetState,
ListState<S2> restoredHistoryState,
boolean isCheckpointingEnabled,
int subtaskIndex,
int totalNumSubtasks)
throws Exception {
// run setup procedure in operator life cycle
source.setRuntimeContext(
new MockStreamingRuntimeContext(
isCheckpointingEnabled, totalNumSubtasks, subtaskIndex));
source.initializeState(
new MockFunctionInitializationContext(
restoredCheckpointId,
new MockOperatorStateStore(restoredOffsetState, restoredHistoryState)));
source.open(new Configuration());
}
private static class ForwardDeserializeSchema
implements DebeziumDeserializationSchema<SourceRecord> {
private static final long serialVersionUID = 1L;
@Override
public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
out.collect(record);
}
@Override
public TypeInformation<SourceRecord> getProducedType() {
return TypeInformation.of(SourceRecord.class);
}
}
private static class MockOperatorStateStore implements OperatorStateStore {
private final ListState<?> restoredOffsetListState;
private final ListState<?> restoredHistoryListState;
private MockOperatorStateStore(
ListState<?> restoredOffsetListState, ListState<?> restoredHistoryListState) {
this.restoredOffsetListState = restoredOffsetListState;
this.restoredHistoryListState = restoredHistoryListState;
}
@Override
@SuppressWarnings("unchecked")
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor)
throws Exception {
if (stateDescriptor.getName().equals(DebeziumSourceFunction.OFFSETS_STATE_NAME)) {
return (ListState<S>) restoredOffsetListState;
} else if (stateDescriptor
.getName()
.equals(DebeziumSourceFunction.HISTORY_RECORDS_STATE_NAME)) {
return (ListState<S>) restoredHistoryListState;
} else {
throw new IllegalStateException("Unknown state.");
}
}
@Override
public <K, V> BroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor)
throws Exception {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getRegisteredStateNames() {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getRegisteredBroadcastStateNames() {
throw new UnsupportedOperationException();
}
}
private static class MockFunctionInitializationContext
implements FunctionInitializationContext {
private final Long restoredCheckpointId;
private final OperatorStateStore operatorStateStore;
private MockFunctionInitializationContext(
Long restoredCheckpointId, OperatorStateStore operatorStateStore) {
this.restoredCheckpointId = restoredCheckpointId;
this.operatorStateStore = operatorStateStore;
}
@Override
public boolean isRestored() {
return restoredCheckpointId != null;
}
@Override
public OptionalLong getRestoredCheckpointId() {
if (restoredCheckpointId == null) {
return OptionalLong.empty();
}
return OptionalLong.of(restoredCheckpointId);
}
@Override
public OperatorStateStore getOperatorStateStore() {
return operatorStateStore;
}
@Override
public KeyedStateStore getKeyedStateStore() {
throw new UnsupportedOperationException();
}
}
private static class BlockingSourceContext<T> extends TestSourceContext<T> {
private final Semaphore blocker = new Semaphore(0);
private final int expectedCount;
private int currentCount = 0;
private BlockingSourceContext(int expectedCount) {
this.expectedCount = expectedCount;
}
@Override
public void collect(T t) {
super.collect(t);
currentCount++;
if (currentCount == expectedCount) {
try {
// block the source to emit records
blocker.acquire();
} catch (InterruptedException e) {
// ignore
}
}
}
}
private static final class TestingListState<T> implements ListState<T> {
private final List<T> list = new ArrayList<>();
private boolean clearCalled = false;
@Override
public void clear() {
list.clear();
clearCalled = true;
}
@Override
public Iterable<T> get() throws Exception {
return list;
}
@Override
public void add(T value) throws Exception {
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
list.add(value);
}
public List<T> getList() {
return list;
}
boolean isClearCalled() {
return clearCalled;
}
@Override
public void update(List<T> values) throws Exception {
clear();
addAll(values);
}
@Override
public void addAll(List<T> values) throws Exception {
if (values != null) {
values.forEach(
v -> Preconditions.checkNotNull(v, "You cannot add null to a ListState."));
list.addAll(values);
}
}
}
}

@ -0,0 +1,106 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Db2Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.stream.Stream;
import static org.junit.Assert.assertNotNull;
/** Basic class for testing DB2 source, this contains a DB2 container which enables binlog. */
public class Db2TestBase {
private static final Logger LOG = LoggerFactory.getLogger(Db2TestBase.class);
private static final DockerImageName DEBEZIUM_DOCKER_IMAGE_NAME =
DockerImageName.parse(
new ImageFromDockerfile("custom/db2-cdc:1.4")
.withDockerfile(getFilePath("db2_server/Dockerfile"))
.get())
.asCompatibleSubstituteFor("ibmcom/db2");
private static final Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(LOG);
private static boolean db2AsnAgentRunning = false;
protected static final Db2Container DB2_CONTAINER =
new Db2Container(DEBEZIUM_DOCKER_IMAGE_NAME)
.withDatabaseName("testdb")
.withUsername("db2inst1")
.withPassword("flinkpw")
.withEnv("AUTOCONFIG", "false")
.withEnv("ARCHIVE_LOGS", "true")
.acceptLicense()
.withLogConsumer(logConsumer)
.withLogConsumer(
outputFrame -> {
if (outputFrame
.getUtf8String()
.contains("The asncdc program enable finished")) {
db2AsnAgentRunning = true;
}
});
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(DB2_CONTAINER)).join();
LOG.info("Containers are started.");
LOG.info("Waiting db2 asn agent start...");
while (!db2AsnAgentRunning) {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
LOG.error("unexpected interrupted exception", e);
}
}
LOG.info("Db2 asn agent are started.");
}
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
DB2_CONTAINER.getJdbcUrl(),
DB2_CONTAINER.getUsername(),
DB2_CONTAINER.getPassword());
}
private static Path getFilePath(String resourceFilePath) {
Path path = null;
try {
URL filePath = Db2TestBase.class.getClassLoader().getResource(resourceFilePath);
assertNotNull("Cannot locate " + resourceFilePath, filePath);
path = Paths.get(filePath.toURI());
} catch (URISyntaxException e) {
LOG.error("Cannot get path from URI.", e);
}
return path;
}
}

@ -0,0 +1,448 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import com.ververica.cdc.connectors.db2.Db2TestBase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.testcontainers.containers.Db2Container.DB2_PORT;
/** Integration tests for DB2 CDC source. */
public class Db2ConnectorITCase extends Db2TestBase {
private static final Logger LOG = LoggerFactory.getLogger(Db2ConnectorITCase.class);
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(1);
}
@Test
public void testConsumingAllEvents()
throws SQLException, InterruptedException, ExecutionException {
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " ID INT NOT NULL,"
+ " NAME STRING,"
+ " DESCRIPTION STRING,"
+ " WEIGHT DECIMAL(10,3)"
+ ") WITH ("
+ " 'connector' = 'db2-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
DB2_CONTAINER.getHost(),
DB2_CONTAINER.getMappedPort(DB2_PORT),
DB2_CONTAINER.getUsername(),
DB2_CONTAINER.getPassword(),
DB2_CONTAINER.getDatabaseName(),
"DB2INST1",
"PRODUCTS");
String sinkDDL =
"CREATE TABLE sink ("
+ " name STRING,"
+ " weightSum DECIMAL(10,3),"
+ " PRIMARY KEY (name) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result =
tEnv.executeSql(
"INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
waitForSnapshotStarted("sink");
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.1' WHERE ID=107;");
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS VALUES (110,'jacket','water resistent white wind breaker',0.2);");
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.17' WHERE ID=111;");
statement.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=111;");
}
waitForSinkSize("sink", 20);
/*
* <pre>
* The final database table looks like this:
*
* > SELECT * FROM DB2INST1.PRODUCTS;
* +-----+--------------------+---------------------------------------------------------+--------+
* | ID | NAME | DESCRIPTION | WEIGHT |
* +-----+--------------------+---------------------------------------------------------+--------+
* | 101 | scooter | Small 2-wheel scooter | 3.14 |
* | 102 | car battery | 12V car battery | 8.1 |
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
* | 104 | hammer | 12oz carpenter's hammer | 0.75 |
* | 105 | hammer | 14oz carpenter's hammer | 0.875 |
* | 106 | hammer | 18oz carpenter hammer | 1 |
* | 107 | rocks | box of assorted rocks | 5.1 |
* | 108 | jacket | water resistent black wind breaker | 0.1 |
* | 109 | spare tire | 24 inch spare tire | 22.2 |
* | 110 | jacket | new water resistent white wind breaker | 0.5 |
* +-----+--------------------+---------------------------------------------------------+--------+
* </pre>
*/
String[] expected =
new String[] {
"scooter,3.140",
"car battery,8.100",
"12-pack drill bits,0.800",
"hammer,2.625",
"rocks,5.100",
"jacket,0.600",
"spare tire,22.200"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@Test
public void testAllTypes() throws Exception {
// NOTE: db2 is not case sensitive by default, the schema returned by debezium
// is uppercase, thus we need use uppercase when defines a db2 table.
String sourceDDL =
String.format(
"CREATE TABLE full_types (\n"
+ " ID INTEGER NOT NULL,\n"
// Debezium cannot track db2 boolean type, see:
// https://issues.redhat.com/browse/DBZ-2587
// + " BOOLEAN_C BOOLEAN NOT NULL,\n"
+ " SMALL_C SMALLINT,\n"
+ " INT_C INTEGER,\n"
+ " BIG_C BIGINT,\n"
+ " REAL_C FLOAT,\n"
+ " DOUBLE_C DOUBLE,\n"
+ " NUMERIC_C DECIMAL(10, 5),\n"
+ " DECIMAL_C DECIMAL(10, 1),\n"
+ " VARCHAR_C STRING,\n"
+ " CHAR_C STRING,\n"
+ " CHARACTER_C STRING,\n"
+ " TIMESTAMP_C TIMESTAMP(3),\n"
+ " DATE_C DATE,\n"
+ " TIME_C TIME(0),\n"
+ " DEFAULT_NUMERIC_C DECIMAL,\n"
+ " TIMESTAMP_PRECISION_C TIMESTAMP(9)\n"
+ ") WITH ("
+ " 'connector' = 'db2-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
DB2_CONTAINER.getHost(),
DB2_CONTAINER.getMappedPort(DB2_PORT),
DB2_CONTAINER.getUsername(),
DB2_CONTAINER.getPassword(),
DB2_CONTAINER.getDatabaseName(),
"DB2INST1",
"FULL_TYPES");
String sinkDDL =
"CREATE TABLE sink (\n"
+ " id INTEGER NOT NULL,\n"
+ " small_c SMALLINT,\n"
+ " int_c INTEGER,\n"
+ " big_c BIGINT,\n"
+ " real_c FLOAT,\n"
+ " double_c DOUBLE,\n"
+ " numeric_c DECIMAL(10, 5),\n"
+ " decimal_c DECIMAL(10, 1),\n"
+ " varchar_c STRING,\n"
+ " char_c STRING,\n"
+ " character_c STRING,\n"
+ " timestamp_c TIMESTAMP(3),\n"
+ " date_c DATE,\n"
+ " time_c TIME(0),\n"
+ " default_numeric_c DECIMAL,\n"
+ " timestamp_precision_c TIMESTAMP(9)\n"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types");
waitForSnapshotStarted("sink");
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE DB2INST1.FULL_TYPES SET SMALL_C=0 WHERE ID=1;");
}
waitForSinkSize("sink", 3);
List<String> expected =
Arrays.asList(
"+I(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)",
"-U(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)",
"+U(1,0,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromLatestOffset() throws Exception {
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " ID INT NOT NULL,"
+ " NAME STRING,"
+ " DESCRIPTION STRING,"
+ " WEIGHT DECIMAL(10,3)"
+ ") WITH ("
+ " 'connector' = 'db2-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s' ,"
+ " 'scan.startup.mode' = 'latest-offset'"
+ ")",
DB2_CONTAINER.getHost(),
DB2_CONTAINER.getMappedPort(DB2_PORT),
DB2_CONTAINER.getUsername(),
DB2_CONTAINER.getPassword(),
DB2_CONTAINER.getDatabaseName(),
"DB2INST1",
"PRODUCTS1");
String sinkDDL =
"CREATE TABLE sink "
+ " WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
do {
Thread.sleep(5000L);
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);
Thread.sleep(30000L);
LOG.info("Snapshot should end and start to read binlog.");
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'jacket','water resistent white wind breaker',0.2)");
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'scooter','Big 2-wheel scooter ',5.18)");
statement.execute(
"UPDATE DB2INST1.PRODUCTS1 SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110");
statement.execute("UPDATE DB2INST1.PRODUCTS1 SET WEIGHT='5.17' WHERE ID=111");
statement.execute("DELETE FROM DB2INST1.PRODUCTS1 WHERE ID=111");
}
waitForSinkSize("sink", 7);
String[] expected =
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@Test
public void testMetadataColumns() throws Throwable {
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " DB_NAME STRING METADATA FROM 'database_name' VIRTUAL,"
+ " SCHEMA_NAME STRING METADATA FROM 'schema_name' VIRTUAL,"
+ " TABLE_NAME STRING METADATA FROM 'table_name' VIRTUAL,"
+ " ID INT NOT NULL,"
+ " NAME STRING,"
+ " DESCRIPTION STRING,"
+ " WEIGHT DECIMAL(10,3),"
+ " PRIMARY KEY (ID) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'db2-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
DB2_CONTAINER.getHost(),
DB2_CONTAINER.getMappedPort(DB2_PORT),
DB2_CONTAINER.getUsername(),
DB2_CONTAINER.getPassword(),
DB2_CONTAINER.getDatabaseName(),
"DB2INST1",
"PRODUCTS2");
String sinkDDL =
"CREATE TABLE sink ("
+ " database_name STRING,"
+ " schema_name STRING,"
+ " table_name STRING,"
+ " id int,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
waitForSnapshotStarted("sink");
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE DB2INST1.PRODUCTS2 SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
statement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT='5.1' WHERE ID=107;");
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS2 VALUES (110,'jacket','water resistent white wind breaker',0.2);");
statement.execute(
"INSERT INTO DB2INST1.PRODUCTS2 VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE DB2INST1.PRODUCTS2 SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
statement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT='5.17' WHERE ID=111;");
statement.execute("DELETE FROM DB2INST1.PRODUCTS2 WHERE ID=111;");
}
waitForSinkSize("sink", 16);
List<String> expected =
Arrays.asList(
"+I(testdb,DB2INST1,PRODUCTS2,101,scooter,Small 2-wheel scooter,3.140)",
"+I(testdb,DB2INST1,PRODUCTS2,102,car battery,12V car battery,8.100)",
"+I(testdb,DB2INST1,PRODUCTS2,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
"+I(testdb,DB2INST1,PRODUCTS2,104,hammer,12oz carpenter's hammer,0.750)",
"+I(testdb,DB2INST1,PRODUCTS2,105,hammer,14oz carpenter's hammer,0.875)",
"+I(testdb,DB2INST1,PRODUCTS2,106,hammer,16oz carpenter's hammer,1.000)",
"+I(testdb,DB2INST1,PRODUCTS2,107,rocks,box of assorted rocks,5.300)",
"+I(testdb,DB2INST1,PRODUCTS2,108,jacket,water resistent black wind breaker,0.100)",
"+I(testdb,DB2INST1,PRODUCTS2,109,spare tire,24 inch spare tire,22.200)",
"+U(testdb,DB2INST1,PRODUCTS2,106,hammer,18oz carpenter hammer,1.000)",
"+U(testdb,DB2INST1,PRODUCTS2,107,rocks,box of assorted rocks,5.100)",
"+I(testdb,DB2INST1,PRODUCTS2,110,jacket,water resistent white wind breaker,0.200)",
"+I(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.180)",
"+U(testdb,DB2INST1,PRODUCTS2,110,jacket,new water resistent white wind breaker,0.500)",
"+U(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.170)",
"-D(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
Collections.sort(expected);
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(1000L);
}
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(1000L);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}

@ -0,0 +1,255 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.db2.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.junit.Test;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Test for {@link Db2TableSource} created by {@link Db2TableSourceFactory}. */
public class Db2TableSourceFactoryTest {
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
Arrays.asList(
Column.physical("aaa", DataTypes.INT().notNull()),
Column.physical("bbb", DataTypes.STRING().notNull()),
Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3))),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final ResolvedSchema SCHEMA_WITH_METADATA =
new ResolvedSchema(
Arrays.asList(
Column.physical("aaa", DataTypes.INT().notNull()),
Column.physical("bbb", DataTypes.STRING().notNull()),
Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3)),
Column.metadata(
"database_name", DataTypes.STRING(), "database_name", true),
Column.metadata("table_name", DataTypes.STRING(), "table_name", true),
Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true),
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
private static final String MY_DATABASE = "myDB";
private static final String MY_SCHEMA = "flinkuser";
private static final String MY_TABLE = "myTable";
private static final Properties PROPERTIES = new Properties();
@Test
public void testCommonProperties() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(properties, SCHEMA);
Db2TableSource expectedSource =
new Db2TableSource(
getPhysicalSchema(SCHEMA),
50000,
MY_LOCALHOST,
MY_DATABASE,
MY_SCHEMA,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@Test
public void testOptionalProperties() {
Map<String, String> options = getAllOptions();
options.put("port", "50000");
options.put("server-time-zone", "Asia/Shanghai");
options.put("debezium.snapshot.mode", "schema_only");
DynamicTableSource actualSource = createTableSource(options, SCHEMA);
Properties dbzProperties = new Properties();
dbzProperties.put("snapshot.mode", "schema_only");
Db2TableSource expectedSource =
new Db2TableSource(
getPhysicalSchema(SCHEMA),
50000,
MY_LOCALHOST,
MY_DATABASE,
MY_SCHEMA,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("Asia/Shanghai"),
dbzProperties,
StartupOptions.latest());
assertEquals(expectedSource, actualSource);
}
@Test
public void testValidation() {
// validate illegal port
try {
Map<String, String> properties = getAllOptions();
properties.put("port", "123b");
createTableSource(properties, SCHEMA);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t, "Could not parse value '123b' for key 'port'.")
.isPresent());
}
// validate missing required
Factory factory = new Db2TableSourceFactory();
for (ConfigOption<?> requiredOption : factory.requiredOptions()) {
Map<String, String> properties = getAllOptions();
properties.remove(requiredOption.key());
try {
createTableSource(properties, SCHEMA);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"Missing required options are:\n\n" + requiredOption.key())
.isPresent());
}
}
// validate unsupported option
try {
Map<String, String> properties = getAllOptions();
properties.put("unknown", "abc");
createTableSource(properties, SCHEMA);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(t, "Unsupported options:\n\nunknown")
.isPresent());
}
}
@Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(properties, SCHEMA_WITH_METADATA);
Db2TableSource db2TableSource = (Db2TableSource) actualSource;
db2TableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "table_name", "schema_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = db2TableSource.copy();
Db2TableSource expectedSource =
new Db2TableSource(
SCHEMA_WITH_METADATA,
50000,
MY_LOCALHOST,
MY_DATABASE,
MY_SCHEMA,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
new Properties(),
StartupOptions.initial());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "table_name", "schema_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
db2TableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
DebeziumSourceFunction<RowData> debeziumSourceFunction =
(DebeziumSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(debeziumSourceFunction, expectedSource.producedDataType);
}
private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "db2-cdc");
options.put("hostname", MY_LOCALHOST);
options.put("database-name", MY_DATABASE);
options.put("schema-name", MY_SCHEMA);
options.put("table-name", MY_TABLE);
options.put("username", MY_USERNAME);
options.put("password", MY_PASSWORD);
return options;
}
private static DynamicTableSource createTableSource(
Map<String, String> options, ResolvedSchema schema) {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(schema).toSchema(),
"mock source",
new ArrayList<>(),
options),
schema),
new Configuration(),
Db2TableSourceFactoryTest.class.getClassLoader(),
false);
}
}

@ -0,0 +1,36 @@
################################################################################
# Copyright 2022 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
FROM ibmcom/db2:11.5.0.0a
MAINTAINER Peter Urbanetz
RUN mkdir -p /asncdctools/src
ADD asncdc_UDF.sql /asncdctools/src
ADD asncdcaddremove.sql /asncdctools/src
ADD asncdctables.sql /asncdctools/src
ADD dbsetup.sh /asncdctools/src
ADD startup-agent.sql /asncdctools/src
ADD startup-cdc-demo.sql /asncdctools/src
ADD inventory.sql /asncdctools/src
ADD column_type_test.sql /asncdctools/src
ADD asncdc.c /asncdctools/src
RUN chmod -R 777 /asncdctools
RUN mkdir /var/custom
ADD cdcsetup.sh /var/custom
RUN chmod -R 777 /var/custom

@ -0,0 +1,178 @@
/*****************************************************************************
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sqludf.h>
#include <sqlstate.h>
void SQL_API_FN asncdcservice(
SQLUDF_VARCHAR *asnCommand, /* input */
SQLUDF_VARCHAR *asnService,
SQLUDF_CLOB *fileData, /* output */
/* null indicators */
SQLUDF_NULLIND *asnCommand_ind, /* input */
SQLUDF_NULLIND *asnService_ind,
SQLUDF_NULLIND *fileData_ind,
SQLUDF_TRAIL_ARGS,
struct sqludf_dbinfo *dbinfo)
{
int fd;
char tmpFileName[] = "/tmp/fileXXXXXX";
fd = mkstemp(tmpFileName);
int strcheck = 0;
char cmdstring[256];
char* szDb2path = getenv("HOME");
char str[20];
int len = 0;
char c;
char *buffer = NULL;
FILE *pidfile;
char dbname[129];
memset(dbname, '\0', 129);
strncpy(dbname, (char *)(dbinfo->dbname), dbinfo->dbnamelen);
dbname[dbinfo->dbnamelen] = '\0';
int pid;
if (strcmp(asnService, "asncdc") == 0)
{
strcheck = sprintf(cmdstring, "pgrep -fx \"%s/sqllib/bin/asncap capture_schema=%s capture_server=%s\" > %s", szDb2path, asnService, dbname, tmpFileName);
int callcheck;
callcheck = system(cmdstring);
pidfile = fopen(tmpFileName, "r");
while ((c = fgetc(pidfile)) != EOF)
{
if (c == '\n')
{
break;
}
len++;
}
buffer = (char *)malloc(sizeof(char) * len);
fseek(pidfile, 0, SEEK_SET);
fread(buffer, sizeof(char), len, pidfile);
fclose(pidfile);
pidfile = fopen(tmpFileName, "w");
if (strcmp(asnCommand, "start") == 0)
{
if (len == 0) // is not running
{
strcheck = sprintf(cmdstring, "%s/sqllib/bin/asncap capture_schema=%s capture_server=%s &", szDb2path, asnService, dbname);
fprintf(pidfile, "start --> %s \n", cmdstring);
callcheck = system(cmdstring);
}
else
{
fprintf(pidfile, "asncap is already running");
}
}
if ((strcmp(asnCommand, "prune") == 0) ||
(strcmp(asnCommand, "reinit") == 0) ||
(strcmp(asnCommand, "suspend") == 0) ||
(strcmp(asnCommand, "resume") == 0) ||
(strcmp(asnCommand, "status") == 0) ||
(strcmp(asnCommand, "stop") == 0))
{
if (len > 0)
{
//buffer[len] = '\0';
//strcheck = sprintf(cmdstring, "/bin/kill -SIGINT %s ", buffer);
//fprintf(pidfile, "stop --> %s", cmdstring);
//callcheck = system(cmdstring);
strcheck = sprintf(cmdstring, "%s/sqllib/bin/asnccmd capture_schema=%s capture_server=%s %s >> %s", szDb2path, asnService, dbname, asnCommand, tmpFileName);
//fprintf(pidfile, "%s --> %s \n", cmdstring, asnCommand);
callcheck = system(cmdstring);
}
else
{
fprintf(pidfile, "asncap is not running");
}
}
fclose(pidfile);
}
/* system(cmdstring); */
int rc = 0;
long fileSize = 0;
size_t readCnt = 0;
FILE *f = NULL;
f = fopen(tmpFileName, "r");
if (!f)
{
strcpy(SQLUDF_MSGTX, "Could not open file ");
strncat(SQLUDF_MSGTX, tmpFileName,
SQLUDF_MSGTEXT_LEN - strlen(SQLUDF_MSGTX) - 1);
strncpy(SQLUDF_STATE, "38100", SQLUDF_SQLSTATE_LEN);
return;
}
rc = fseek(f, 0, SEEK_END);
if (rc)
{
sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc);
strncpy(SQLUDF_STATE, "38101", SQLUDF_SQLSTATE_LEN);
return;
}
/* verify the file size */
fileSize = ftell(f);
if (fileSize > fileData->length)
{
strcpy(SQLUDF_MSGTX, "File too large");
strncpy(SQLUDF_STATE, "38102", SQLUDF_SQLSTATE_LEN);
return;
}
/* go to the beginning and read the entire file */
rc = fseek(f, 0, 0);
if (rc)
{
sprintf(SQLUDF_MSGTX, "fseek() failed with rc = %d", rc);
strncpy(SQLUDF_STATE, "38103", SQLUDF_SQLSTATE_LEN);
return;
}
readCnt = fread(fileData->data, 1, fileSize, f);
if (readCnt != fileSize)
{
/* raise a warning that something weird is going on */
sprintf(SQLUDF_MSGTX, "Could not read entire file "
"(%d vs %d)",
readCnt, fileSize);
strncpy(SQLUDF_STATE, "01H10", SQLUDF_SQLSTATE_LEN);
*fileData_ind = -1;
}
else
{
fileData->length = readCnt;
*fileData_ind = 0;
}
// remove temorary file
rc = remove(tmpFileName);
//fclose(pFile);
}

@ -0,0 +1,30 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
DROP SPECIFIC FUNCTION ASNCDC.asncdcservice;
CREATE FUNCTION ASNCDC.ASNCDCSERVICES(command VARCHAR(6), service VARCHAR(8))
RETURNS CLOB(100K)
SPECIFIC asncdcservice
EXTERNAL NAME 'asncdc!asncdcservice'
LANGUAGE C
PARAMETER STYLE SQL
DBINFO
DETERMINISTIC
NOT FENCED
RETURNS NULL ON NULL INPUT
NO SQL
NO EXTERNAL ACTION
NO SCRATCHPAD
ALLOW PARALLEL
NO FINAL CALL;

@ -0,0 +1,204 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- Define ASNCDC.REMOVETABLE() and ASNCDC.ADDTABLE()
-- ASNCDC.ADDTABLE() puts a table in CDC mode, making the ASNCapture server collect changes for the table
-- ASNCDC.REMOVETABLE() makes the ASNCapture server stop collecting changes for that table
--#SET TERMINATOR @
CREATE OR REPLACE PROCEDURE ASNCDC.REMOVETABLE(
in tableschema VARCHAR(128),
in tablename VARCHAR(128)
)
LANGUAGE SQL
P1:
BEGIN
DECLARE stmtSQL VARCHAR(2048);
DECLARE SQLCODE INT;
DECLARE SQLSTATE CHAR(5);
DECLARE RC_SQLCODE INT DEFAULT 0;
DECLARE RC_SQLSTATE CHAR(5) DEFAULT '00000';
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION, SQLWARNING, NOT FOUND VALUES (SQLCODE, SQLSTATE) INTO RC_SQLCODE, RC_SQLSTATE;
-- delete ASN.IBMSNAP_PRUNCTL entries / source
SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_PRUNCNTL WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || '''';
EXECUTE IMMEDIATE stmtSQL;
-- delete ASN.IBMSNAP_Register entries / source
SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_REGISTER WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_TABLE=''' || tablename || '''';
EXECUTE IMMEDIATE stmtSQL;
-- drop CD Table / source
SET stmtSQL = 'DROP TABLE ASNCDC.CDC_' ||
tableschema || '_' || tablename ;
EXECUTE IMMEDIATE stmtSQL;
-- delete ASN.IBMSNAP_SUBS_COLS entries /target
SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_COLS WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || '''';
EXECUTE IMMEDIATE stmtSQL;
-- delete ASN.IBMSNAP_SUSBS_MEMBER entries /target
SET stmtSQL = 'DELETE FROM ASNCDC.IBMSNAP_SUBS_MEMBR WHERE TARGET_OWNER=''' || tableschema || ''' AND TARGET_TABLE=''' || tablename || '''';
EXECUTE IMMEDIATE stmtSQL;
-- delete ASN.IBMQREP_COLVERSION
SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_COLVERSION col WHERE EXISTS (SELECT * FROM ASNCDC.IBMQREP_TABVERSION tab WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || '''AND col.TABLEID1 = tab.TABLEID1 AND col.TABLEID2 = tab.TABLEID2';
EXECUTE IMMEDIATE stmtSQL;
-- delete ASN.IBMQREP_TABVERSION
SET stmtSQL = 'DELETE FROM ASNCDC.IBMQREP_TABVERSION WHERE SOURCE_OWNER=''' || tableschema || ''' AND SOURCE_NAME=''' || tablename || '''';
EXECUTE IMMEDIATE stmtSQL;
SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE NONE';
EXECUTE IMMEDIATE stmtSQL;
END P1@
--#SET TERMINATOR ;
--#SET TERMINATOR @
CREATE OR REPLACE PROCEDURE ASNCDC.ADDTABLE(
in tableschema VARCHAR(128),
in tablename VARCHAR(128)
)
LANGUAGE SQL
P1:
BEGIN
DECLARE SQLSTATE CHAR(5);
DECLARE stmtSQL VARCHAR(2048);
SET stmtSQL = 'ALTER TABLE ' || tableschema || '.' || tablename || ' DATA CAPTURE CHANGES';
EXECUTE IMMEDIATE stmtSQL;
SET stmtSQL = 'CREATE TABLE ASNCDC.CDC_' ||
tableschema || '_' || tablename ||
' AS ( SELECT ' ||
' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_COMMITSEQ, ' ||
' CAST('''' AS VARCHAR ( 16 ) FOR BIT DATA) AS IBMSNAP_INTENTSEQ, ' ||
' CAST ('''' AS CHAR(1)) ' ||
' AS IBMSNAP_OPERATION, t.* FROM ' || tableschema || '.' || tablename || ' as t ) WITH NO DATA ORGANIZE BY ROW ';
EXECUTE IMMEDIATE stmtSQL;
SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' ||
tableschema || '_' || tablename ||
' ALTER COLUMN IBMSNAP_COMMITSEQ SET NOT NULL';
EXECUTE IMMEDIATE stmtSQL;
SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' ||
tableschema || '_' || tablename ||
' ALTER COLUMN IBMSNAP_INTENTSEQ SET NOT NULL';
EXECUTE IMMEDIATE stmtSQL;
SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' ||
tableschema || '_' || tablename ||
' ALTER COLUMN IBMSNAP_OPERATION SET NOT NULL';
EXECUTE IMMEDIATE stmtSQL;
SET stmtSQL = 'CREATE UNIQUE INDEX ASNCDC.IXCDC_' ||
tableschema || '_' || tablename ||
' ON ASNCDC.CDC_' ||
tableschema || '_' || tablename ||
' ( IBMSNAP_COMMITSEQ ASC, IBMSNAP_INTENTSEQ ASC ) PCTFREE 0 MINPCTUSED 0';
EXECUTE IMMEDIATE stmtSQL;
SET stmtSQL = 'ALTER TABLE ASNCDC.CDC_' ||
tableschema || '_' || tablename ||
' VOLATILE CARDINALITY';
EXECUTE IMMEDIATE stmtSQL;
SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_REGISTER (SOURCE_OWNER, SOURCE_TABLE, ' ||
'SOURCE_VIEW_QUAL, GLOBAL_RECORD, SOURCE_STRUCTURE, SOURCE_CONDENSED, ' ||
'SOURCE_COMPLETE, CD_OWNER, CD_TABLE, PHYS_CHANGE_OWNER, ' ||
'PHYS_CHANGE_TABLE, CD_OLD_SYNCHPOINT, CD_NEW_SYNCHPOINT, ' ||
'DISABLE_REFRESH, CCD_OWNER, CCD_TABLE, CCD_OLD_SYNCHPOINT, ' ||
'SYNCHPOINT, SYNCHTIME, CCD_CONDENSED, CCD_COMPLETE, ARCH_LEVEL, ' ||
'DESCRIPTION, BEFORE_IMG_PREFIX, CONFLICT_LEVEL, ' ||
'CHG_UPD_TO_DEL_INS, CHGONLY, RECAPTURE, OPTION_FLAGS, ' ||
'STOP_ON_ERROR, STATE, STATE_INFO ) VALUES( ' ||
'''' || tableschema || ''', ' ||
'''' || tablename || ''', ' ||
'0, ' ||
'''N'', ' ||
'1, ' ||
'''Y'', ' ||
'''Y'', ' ||
'''ASNCDC'', ' ||
'''CDC_' || tableschema || '_' || tablename || ''', ' ||
'''ASNCDC'', ' ||
'''CDC_' || tableschema || '_' || tablename || ''', ' ||
'null, ' ||
'null, ' ||
'0, ' ||
'null, ' ||
'null, ' ||
'null, ' ||
'null, ' ||
'null, ' ||
'null, ' ||
'null, ' ||
'''0801'', ' ||
'null, ' ||
'null, ' ||
'''0'', ' ||
'''Y'', ' ||
'''N'', ' ||
'''Y'', ' ||
'''NNNN'', ' ||
'''Y'', ' ||
'''A'',' ||
'null ) ';
EXECUTE IMMEDIATE stmtSQL;
SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' ||
'TARGET_SERVER, ' ||
'TARGET_OWNER, ' ||
'TARGET_TABLE, ' ||
'SYNCHTIME, ' ||
'SYNCHPOINT, ' ||
'SOURCE_OWNER, ' ||
'SOURCE_TABLE, ' ||
'SOURCE_VIEW_QUAL, ' ||
'APPLY_QUAL, ' ||
'SET_NAME, ' ||
'CNTL_SERVER , ' ||
'TARGET_STRUCTURE , ' ||
'CNTL_ALIAS , ' ||
'PHYS_CHANGE_OWNER , ' ||
'PHYS_CHANGE_TABLE , ' ||
'MAP_ID ' ||
') VALUES ( ' ||
'''KAFKA'', ' ||
'''' || tableschema || ''', ' ||
'''' || tablename || ''', ' ||
'NULL, ' ||
'NULL, ' ||
'''' || tableschema || ''', ' ||
'''' || tablename || ''', ' ||
'0, ' ||
'''KAFKAQUAL'', ' ||
'''SET001'', ' ||
' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' ||
'8, ' ||
' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' ||
'''ASNCDC'', ' ||
'''CDC_' || tableschema || '_' || tablename || ''', ' ||
' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' ||
' )';
EXECUTE IMMEDIATE stmtSQL;
END P1@
--#SET TERMINATOR ;

@ -0,0 +1,492 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- 1021 db2 LEVEL Version 10.2.0 --> 11.5.0 1150
CREATE TABLE ASNCDC.IBMQREP_COLVERSION(
LSN VARCHAR( 16) FOR BIT DATA NOT NULL,
TABLEID1 SMALLINT NOT NULL,
TABLEID2 SMALLINT NOT NULL,
POSITION SMALLINT NOT NULL,
NAME VARCHAR(128) NOT NULL,
TYPE SMALLINT NOT NULL,
LENGTH INTEGER NOT NULL,
NULLS CHAR( 1) NOT NULL,
DEFAULT VARCHAR(1536),
CODEPAGE INTEGER,
SCALE INTEGER,
VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT )
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMQREP_COLVERSIOX
ON ASNCDC.IBMQREP_COLVERSION(
LSN ASC,
TABLEID1 ASC,
TABLEID2 ASC,
POSITION ASC);
CREATE INDEX ASNCDC.IX2COLVERSION
ON ASNCDC.IBMQREP_COLVERSION(
TABLEID1 ASC,
TABLEID2 ASC);
CREATE TABLE ASNCDC.IBMQREP_TABVERSION(
LSN VARCHAR( 16) FOR BIT DATA NOT NULL,
TABLEID1 SMALLINT NOT NULL,
TABLEID2 SMALLINT NOT NULL,
VERSION INTEGER NOT NULL,
SOURCE_OWNER VARCHAR(128) NOT NULL,
SOURCE_NAME VARCHAR(128) NOT NULL,
VERSION_TIME TIMESTAMP NOT NULL WITH DEFAULT )
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMQREP_TABVERSIOX
ON ASNCDC.IBMQREP_TABVERSION(
LSN ASC,
TABLEID1 ASC,
TABLEID2 ASC,
VERSION ASC);
CREATE INDEX ASNCDC.IX2TABVERSION
ON ASNCDC.IBMQREP_TABVERSION(
TABLEID1 ASC,
TABLEID2 ASC);
CREATE INDEX ASNCDC.IX3TABVERSION
ON ASNCDC.IBMQREP_TABVERSION(
SOURCE_OWNER ASC,
SOURCE_NAME ASC);
CREATE TABLE ASNCDC.IBMSNAP_APPLEVEL(
ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021')
ORGANIZE BY ROW;
INSERT INTO ASNCDC.IBMSNAP_APPLEVEL(ARCH_LEVEL) VALUES (
'1021');
CREATE TABLE ASNCDC.IBMSNAP_CAPMON(
MONITOR_TIME TIMESTAMP NOT NULL,
RESTART_TIME TIMESTAMP NOT NULL,
CURRENT_MEMORY INT NOT NULL,
CD_ROWS_INSERTED INT NOT NULL,
RECAP_ROWS_SKIPPED INT NOT NULL,
TRIGR_ROWS_SKIPPED INT NOT NULL,
CHG_ROWS_SKIPPED INT NOT NULL,
TRANS_PROCESSED INT NOT NULL,
TRANS_SPILLED INT NOT NULL,
MAX_TRANS_SIZE INT NOT NULL,
LOCKING_RETRIES INT NOT NULL,
JRN_LIB CHAR( 10),
JRN_NAME CHAR( 10),
LOGREADLIMIT INT NOT NULL,
CAPTURE_IDLE INT NOT NULL,
SYNCHTIME TIMESTAMP NOT NULL,
CURRENT_LOG_TIME TIMESTAMP NOT NULL WITH DEFAULT ,
LAST_EOL_TIME TIMESTAMP,
RESTART_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT ,
CURRENT_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT ,
RESTART_MAXCMTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL WITH DEFAULT ,
LOGREAD_API_TIME INT,
NUM_LOGREAD_CALLS INT,
NUM_END_OF_LOGS INT,
LOGRDR_SLEEPTIME INT,
NUM_LOGREAD_F_CALLS INT,
TRANS_QUEUED INT,
NUM_WARNTXS INT,
NUM_WARNLOGAPI INT)
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPMONX
ON ASNCDC.IBMSNAP_CAPMON(
MONITOR_TIME ASC);
ALTER TABLE ASNCDC.IBMSNAP_CAPMON VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_CAPPARMS(
RETENTION_LIMIT INT,
LAG_LIMIT INT,
COMMIT_INTERVAL INT,
PRUNE_INTERVAL INT,
TRACE_LIMIT INT,
MONITOR_LIMIT INT,
MONITOR_INTERVAL INT,
MEMORY_LIMIT SMALLINT,
REMOTE_SRC_SERVER CHAR( 18),
AUTOPRUNE CHAR( 1),
TERM CHAR( 1),
AUTOSTOP CHAR( 1),
LOGREUSE CHAR( 1),
LOGSTDOUT CHAR( 1),
SLEEP_INTERVAL SMALLINT,
CAPTURE_PATH VARCHAR(1040),
STARTMODE VARCHAR( 10),
LOGRDBUFSZ INT NOT NULL WITH DEFAULT 256,
ARCH_LEVEL CHAR( 4) NOT NULL WITH DEFAULT '1021',
COMPATIBILITY CHAR( 4) NOT NULL WITH DEFAULT '1021')
ORGANIZE BY ROW;
INSERT INTO ASNCDC.IBMSNAP_CAPPARMS(
RETENTION_LIMIT,
LAG_LIMIT,
COMMIT_INTERVAL,
PRUNE_INTERVAL,
TRACE_LIMIT,
MONITOR_LIMIT,
MONITOR_INTERVAL,
MEMORY_LIMIT,
SLEEP_INTERVAL,
AUTOPRUNE,
TERM,
AUTOSTOP,
LOGREUSE,
LOGSTDOUT,
CAPTURE_PATH,
STARTMODE,
COMPATIBILITY)
VALUES (
10080,
10080,
30,
300,
10080,
10080,
300,
32,
5,
'Y',
'Y',
'N',
'N',
'N',
NULL,
'WARMSI',
'1021'
);
CREATE TABLE ASNCDC.IBMSNAP_CAPSCHEMAS (
CAP_SCHEMA_NAME VARCHAR(128 OCTETS) NOT NULL
)
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_CAPSCHEMASX
ON ASNCDC.IBMSNAP_CAPSCHEMAS
(CAP_SCHEMA_NAME ASC);
INSERT INTO ASNCDC.IBMSNAP_CAPSCHEMAS(CAP_SCHEMA_NAME) VALUES (
'ASNCDC');
CREATE TABLE ASNCDC.IBMSNAP_CAPTRACE(
OPERATION CHAR( 8) NOT NULL,
TRACE_TIME TIMESTAMP NOT NULL,
DESCRIPTION VARCHAR(1024) NOT NULL)
ORGANIZE BY ROW;
CREATE INDEX ASNCDC.IBMSNAP_CAPTRACEX
ON ASNCDC.IBMSNAP_CAPTRACE(
TRACE_TIME ASC);
ALTER TABLE ASNCDC.IBMSNAP_CAPTRACE VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_PRUNCNTL(
TARGET_SERVER CHAR(18) NOT NULL,
TARGET_OWNER VARCHAR(128) NOT NULL,
TARGET_TABLE VARCHAR(128) NOT NULL,
SYNCHTIME TIMESTAMP,
SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
SOURCE_OWNER VARCHAR(128) NOT NULL,
SOURCE_TABLE VARCHAR(128) NOT NULL,
SOURCE_VIEW_QUAL SMALLINT NOT NULL,
APPLY_QUAL CHAR( 18) NOT NULL,
SET_NAME CHAR( 18) NOT NULL,
CNTL_SERVER CHAR( 18) NOT NULL,
TARGET_STRUCTURE SMALLINT NOT NULL,
CNTL_ALIAS CHAR( 8),
PHYS_CHANGE_OWNER VARCHAR(128),
PHYS_CHANGE_TABLE VARCHAR(128),
MAP_ID VARCHAR(10) NOT NULL)
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX
ON ASNCDC.IBMSNAP_PRUNCNTL(
SOURCE_OWNER ASC,
SOURCE_TABLE ASC,
SOURCE_VIEW_QUAL ASC,
APPLY_QUAL ASC,
SET_NAME ASC,
TARGET_SERVER ASC,
TARGET_TABLE ASC,
TARGET_OWNER ASC);
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNCNTLX1
ON ASNCDC.IBMSNAP_PRUNCNTL(
MAP_ID ASC);
CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX2
ON ASNCDC.IBMSNAP_PRUNCNTL(
PHYS_CHANGE_OWNER ASC,
PHYS_CHANGE_TABLE ASC);
CREATE INDEX ASNCDC.IBMSNAP_PRUNCNTLX3
ON ASNCDC.IBMSNAP_PRUNCNTL(
APPLY_QUAL ASC,
SET_NAME ASC,
TARGET_SERVER ASC);
ALTER TABLE ASNCDC.IBMSNAP_PRUNCNTL VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_PRUNE_LOCK(
DUMMY CHAR( 1))
ORGANIZE BY ROW;
CREATE TABLE ASNCDC.IBMSNAP_PRUNE_SET(
TARGET_SERVER CHAR( 18) NOT NULL,
APPLY_QUAL CHAR( 18) NOT NULL,
SET_NAME CHAR( 18) NOT NULL,
SYNCHTIME TIMESTAMP,
SYNCHPOINT VARCHAR( 16) FOR BIT DATA NOT NULL)
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_PRUNE_SETX
ON ASNCDC.IBMSNAP_PRUNE_SET(
TARGET_SERVER ASC,
APPLY_QUAL ASC,
SET_NAME ASC);
ALTER TABLE ASNCDC.IBMSNAP_PRUNE_SET VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_REGISTER(
SOURCE_OWNER VARCHAR(128) NOT NULL,
SOURCE_TABLE VARCHAR(128) NOT NULL,
SOURCE_VIEW_QUAL SMALLINT NOT NULL,
GLOBAL_RECORD CHAR( 1) NOT NULL,
SOURCE_STRUCTURE SMALLINT NOT NULL,
SOURCE_CONDENSED CHAR( 1) NOT NULL,
SOURCE_COMPLETE CHAR( 1) NOT NULL,
CD_OWNER VARCHAR(128),
CD_TABLE VARCHAR(128),
PHYS_CHANGE_OWNER VARCHAR(128),
PHYS_CHANGE_TABLE VARCHAR(128),
CD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
CD_NEW_SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
DISABLE_REFRESH SMALLINT NOT NULL,
CCD_OWNER VARCHAR(128),
CCD_TABLE VARCHAR(128),
CCD_OLD_SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
SYNCHTIME TIMESTAMP,
CCD_CONDENSED CHAR( 1),
CCD_COMPLETE CHAR( 1),
ARCH_LEVEL CHAR( 4) NOT NULL,
DESCRIPTION CHAR(254),
BEFORE_IMG_PREFIX VARCHAR( 4),
CONFLICT_LEVEL CHAR( 1),
CHG_UPD_TO_DEL_INS CHAR( 1),
CHGONLY CHAR( 1),
RECAPTURE CHAR( 1),
OPTION_FLAGS CHAR( 4) NOT NULL,
STOP_ON_ERROR CHAR( 1) WITH DEFAULT 'Y',
STATE CHAR( 1) WITH DEFAULT 'I',
STATE_INFO CHAR( 8))
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_REGISTERX
ON ASNCDC.IBMSNAP_REGISTER(
SOURCE_OWNER ASC,
SOURCE_TABLE ASC,
SOURCE_VIEW_QUAL ASC);
CREATE INDEX ASNCDC.IBMSNAP_REGISTERX1
ON ASNCDC.IBMSNAP_REGISTER(
PHYS_CHANGE_OWNER ASC,
PHYS_CHANGE_TABLE ASC);
CREATE INDEX ASNCDC.IBMSNAP_REGISTERX2
ON ASNCDC.IBMSNAP_REGISTER(
GLOBAL_RECORD ASC);
ALTER TABLE ASNCDC.IBMSNAP_REGISTER VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_RESTART(
MAX_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL,
MAX_COMMIT_TIME TIMESTAMP NOT NULL,
MIN_INFLIGHTSEQ VARCHAR( 16) FOR BIT DATA NOT NULL,
CURR_COMMIT_TIME TIMESTAMP NOT NULL,
CAPTURE_FIRST_SEQ VARCHAR( 16) FOR BIT DATA NOT NULL)
ORGANIZE BY ROW;
CREATE TABLE ASNCDC.IBMSNAP_SIGNAL(
SIGNAL_TIME TIMESTAMP NOT NULL WITH DEFAULT ,
SIGNAL_TYPE VARCHAR( 30) NOT NULL,
SIGNAL_SUBTYPE VARCHAR( 30),
SIGNAL_INPUT_IN VARCHAR(500),
SIGNAL_STATE CHAR( 1) NOT NULL,
SIGNAL_LSN VARCHAR( 16) FOR BIT DATA)
DATA CAPTURE CHANGES
ORGANIZE BY ROW;
CREATE INDEX ASNCDC.IBMSNAP_SIGNALX
ON ASNCDC.IBMSNAP_SIGNAL(
SIGNAL_TIME ASC);
ALTER TABLE ASNCDC.IBMSNAP_SIGNAL VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_SUBS_COLS(
APPLY_QUAL CHAR( 18) NOT NULL,
SET_NAME CHAR( 18) NOT NULL,
WHOS_ON_FIRST CHAR( 1) NOT NULL,
TARGET_OWNER VARCHAR(128) NOT NULL,
TARGET_TABLE VARCHAR(128) NOT NULL,
COL_TYPE CHAR( 1) NOT NULL,
TARGET_NAME VARCHAR(128) NOT NULL,
IS_KEY CHAR( 1) NOT NULL,
COLNO SMALLINT NOT NULL,
EXPRESSION VARCHAR(1024) NOT NULL)
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_COLSX
ON ASNCDC.IBMSNAP_SUBS_COLS(
APPLY_QUAL ASC,
SET_NAME ASC,
WHOS_ON_FIRST ASC,
TARGET_OWNER ASC,
TARGET_TABLE ASC,
TARGET_NAME ASC);
ALTER TABLE ASNCDC.IBMSNAP_SUBS_COLS VOLATILE CARDINALITY;
--CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_EVENTX
--ON ASNCDC.IBMSNAP_SUBS_EVENT(
--EVENT_NAME ASC,
--EVENT_TIME ASC);
--ALTER TABLE ASNCDC.IBMSNAP_SUBS_EVENT VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_SUBS_MEMBR(
APPLY_QUAL CHAR( 18) NOT NULL,
SET_NAME CHAR( 18) NOT NULL,
WHOS_ON_FIRST CHAR( 1) NOT NULL,
SOURCE_OWNER VARCHAR(128) NOT NULL,
SOURCE_TABLE VARCHAR(128) NOT NULL,
SOURCE_VIEW_QUAL SMALLINT NOT NULL,
TARGET_OWNER VARCHAR(128) NOT NULL,
TARGET_TABLE VARCHAR(128) NOT NULL,
TARGET_CONDENSED CHAR( 1) NOT NULL,
TARGET_COMPLETE CHAR( 1) NOT NULL,
TARGET_STRUCTURE SMALLINT NOT NULL,
PREDICATES VARCHAR(1024),
MEMBER_STATE CHAR( 1),
TARGET_KEY_CHG CHAR( 1) NOT NULL,
UOW_CD_PREDICATES VARCHAR(1024),
JOIN_UOW_CD CHAR( 1),
LOADX_TYPE SMALLINT,
LOADX_SRC_N_OWNER VARCHAR( 128),
LOADX_SRC_N_TABLE VARCHAR(128))
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_MEMBRX
ON ASNCDC.IBMSNAP_SUBS_MEMBR(
APPLY_QUAL ASC,
SET_NAME ASC,
WHOS_ON_FIRST ASC,
SOURCE_OWNER ASC,
SOURCE_TABLE ASC,
SOURCE_VIEW_QUAL ASC,
TARGET_OWNER ASC,
TARGET_TABLE ASC);
ALTER TABLE ASNCDC.IBMSNAP_SUBS_MEMBR VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_SUBS_SET(
APPLY_QUAL CHAR( 18) NOT NULL,
SET_NAME CHAR( 18) NOT NULL,
SET_TYPE CHAR( 1) NOT NULL,
WHOS_ON_FIRST CHAR( 1) NOT NULL,
ACTIVATE SMALLINT NOT NULL,
SOURCE_SERVER CHAR( 18) NOT NULL,
SOURCE_ALIAS CHAR( 8),
TARGET_SERVER CHAR( 18) NOT NULL,
TARGET_ALIAS CHAR( 8),
STATUS SMALLINT NOT NULL,
LASTRUN TIMESTAMP NOT NULL,
REFRESH_TYPE CHAR( 1) NOT NULL,
SLEEP_MINUTES INT,
EVENT_NAME CHAR( 18),
LASTSUCCESS TIMESTAMP,
SYNCHPOINT VARCHAR( 16) FOR BIT DATA,
SYNCHTIME TIMESTAMP,
CAPTURE_SCHEMA VARCHAR(128) NOT NULL,
TGT_CAPTURE_SCHEMA VARCHAR(128),
FEDERATED_SRC_SRVR VARCHAR( 18),
FEDERATED_TGT_SRVR VARCHAR( 18),
JRN_LIB CHAR( 10),
JRN_NAME CHAR( 10),
OPTION_FLAGS CHAR( 4) NOT NULL,
COMMIT_COUNT SMALLINT,
MAX_SYNCH_MINUTES SMALLINT,
AUX_STMTS SMALLINT NOT NULL,
ARCH_LEVEL CHAR( 4) NOT NULL)
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_SETX
ON ASNCDC.IBMSNAP_SUBS_SET(
APPLY_QUAL ASC,
SET_NAME ASC,
WHOS_ON_FIRST ASC);
ALTER TABLE ASNCDC.IBMSNAP_SUBS_SET VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_SUBS_STMTS(
APPLY_QUAL CHAR( 18) NOT NULL,
SET_NAME CHAR( 18) NOT NULL,
WHOS_ON_FIRST CHAR( 1) NOT NULL,
BEFORE_OR_AFTER CHAR( 1) NOT NULL,
STMT_NUMBER SMALLINT NOT NULL,
EI_OR_CALL CHAR( 1) NOT NULL,
SQL_STMT VARCHAR(1024),
ACCEPT_SQLSTATES VARCHAR( 50))
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_SUBS_STMTSX
ON ASNCDC.IBMSNAP_SUBS_STMTS(
APPLY_QUAL ASC,
SET_NAME ASC,
WHOS_ON_FIRST ASC,
BEFORE_OR_AFTER ASC,
STMT_NUMBER ASC);
ALTER TABLE ASNCDC.IBMSNAP_SUBS_STMTS VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_UOW(
IBMSNAP_UOWID CHAR( 10) FOR BIT DATA NOT NULL,
IBMSNAP_COMMITSEQ VARCHAR( 16) FOR BIT DATA NOT NULL,
IBMSNAP_LOGMARKER TIMESTAMP NOT NULL,
IBMSNAP_AUTHTKN VARCHAR(30) NOT NULL,
IBMSNAP_AUTHID VARCHAR(128) NOT NULL,
IBMSNAP_REJ_CODE CHAR( 1) NOT NULL WITH DEFAULT ,
IBMSNAP_APPLY_QUAL CHAR( 18) NOT NULL WITH DEFAULT )
ORGANIZE BY ROW;
CREATE UNIQUE INDEX ASNCDC.IBMSNAP_UOWX
ON ASNCDC.IBMSNAP_UOW(
IBMSNAP_COMMITSEQ ASC,
IBMSNAP_LOGMARKER ASC);
ALTER TABLE ASNCDC.IBMSNAP_UOW VOLATILE CARDINALITY;
CREATE TABLE ASNCDC.IBMSNAP_CAPENQ (
LOCK_NAME CHAR(9 OCTETS)
)
ORGANIZE BY ROW
DATA CAPTURE NONE
COMPRESS NO;

@ -0,0 +1,33 @@
#/bin/bash
################################################################################
# Copyright 2022 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
if [ ! -f /asncdctools/src/asncdc.nlk ]; then
rc=1
echo "waiting for db2inst1 exists ."
while [ "$rc" -ne 0 ]
do
sleep 5
id db2inst1
rc=$?
echo '.'
done
su -c "/asncdctools/src/dbsetup.sh $DBNAME" - db2inst1
fi
touch /asncdctools/src/asncdc.nlk
echo "The asncdc program enable finished"

@ -0,0 +1,42 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: column_type_test
-- ----------------------------------------------------------------------------------------------------------------
CREATE TABLE DB2INST1.FULL_TYPES (
ID INTEGER NOT NULL,
SMALL_C SMALLINT,
INT_C INTEGER,
BIG_C BIGINT,
REAL_C REAL,
DOUBLE_C DOUBLE,
NUMERIC_C NUMERIC(10, 5),
DECIMAL_C DECIMAL(10, 1),
VARCHAR_C VARCHAR(200),
CHAR_C CHAR,
CHARACTER_C CHAR(3),
TIMESTAMP_C TIMESTAMP,
DATE_C DATE,
TIME_C TIME,
DEFAULT_NUMERIC_C NUMERIC,
TIMESTAMP_PRECISION_C TIMESTAMP(9),
PRIMARY KEY (ID)
);
INSERT INTO DB2INST1.FULL_TYPES VALUES (
1, 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443,
'Hello World', 'a', 'abc', '2020-07-17 18:00:22.123', '2020-07-17', '18:00:22', 500,
'2020-07-17 18:00:22.123456789');

@ -0,0 +1,70 @@
#/bin/bash
################################################################################
# Copyright 2022 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
echo "Compile ASN tool ..."
cd /asncdctools/src
/opt/ibm/db2/V11.5/samples/c/bldrtn asncdc
DBNAME=$1
DB2DIR=/opt/ibm/db2/V11.5
rc=1
echo "Waiting for DB2 start ( $DBNAME ) ."
while [ "$rc" -ne 0 ]
do
sleep 5
db2 connect to $DBNAME
rc=$?
echo '.'
done
# enable metacatalog read via JDBC
cd $HOME/sqllib/bnd
db2 bind db2schema.bnd blocking all grant public sqlerror continue
# do a backup and restart the db
db2 backup db $DBNAME to /dev/null
db2 restart db $DBNAME
db2 connect to $DBNAME
cp /asncdctools/src/asncdc /database/config/db2inst1/sqllib/function
chmod 777 /database/config/db2inst1/sqllib/function
# add UDF / start stop asncap
db2 -tvmf /asncdctools/src/asncdc_UDF.sql
# create asntables
db2 -tvmf /asncdctools/src/asncdctables.sql
# add UDF / add remove asntables
db2 -tvmf /asncdctools/src/asncdcaddremove.sql
# create sample table and data
db2 -tvmf /asncdctools/src/inventory.sql
db2 -tvmf /asncdctools/src/column_type_test.sql
db2 -tvmf /asncdctools/src/startup-agent.sql
sleep 10
db2 -tvmf /asncdctools/src/startup-cdc-demo.sql
echo "db2 setup done"

@ -0,0 +1,70 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- Create and populate our test products table using a single insert with many rows
CREATE TABLE DB2INST1.PRODUCTS (
ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY
(START WITH 101, INCREMENT BY 1) PRIMARY KEY,
NAME VARCHAR(255) NOT NULL,
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT
);
CREATE TABLE DB2INST1.PRODUCTS1 (
ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY
(START WITH 101, INCREMENT BY 1) PRIMARY KEY,
NAME VARCHAR(255) NOT NULL,
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT
);
CREATE TABLE DB2INST1.PRODUCTS2 (
ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY
(START WITH 101, INCREMENT BY 1) PRIMARY KEY,
NAME VARCHAR(255) NOT NULL,
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT
);
INSERT INTO DB2INST1.PRODUCTS(NAME,DESCRIPTION,WEIGHT)
VALUES ('scooter','Small 2-wheel scooter',3.14),
('car battery','12V car battery',8.1),
('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
('hammer','12oz carpenter''s hammer',0.75),
('hammer','14oz carpenter''s hammer',0.875),
('hammer','16oz carpenter''s hammer',1.0),
('rocks','box of assorted rocks',5.3),
('jacket','water resistent black wind breaker',0.1),
('spare tire','24 inch spare tire',22.2);
INSERT INTO DB2INST1.PRODUCTS1(NAME,DESCRIPTION,WEIGHT)
VALUES ('scooter','Small 2-wheel scooter',3.14),
('car battery','12V car battery',8.1),
('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
('hammer','12oz carpenter''s hammer',0.75),
('hammer','14oz carpenter''s hammer',0.875),
('hammer','16oz carpenter''s hammer',1.0),
('rocks','box of assorted rocks',5.3),
('jacket','water resistent black wind breaker',0.1),
('spare tire','24 inch spare tire',22.2);
INSERT INTO DB2INST1.PRODUCTS2(NAME,DESCRIPTION,WEIGHT)
VALUES ('scooter','Small 2-wheel scooter',3.14),
('car battery','12V car battery',8.1),
('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),
('hammer','12oz carpenter''s hammer',0.75),
('hammer','14oz carpenter''s hammer',0.875),
('hammer','16oz carpenter''s hammer',1.0),
('rocks','box of assorted rocks',5.3),
('jacket','water resistent black wind breaker',0.1),
('spare tire','24 inch spare tire',22.2);

@ -0,0 +1,14 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');

@ -0,0 +1,21 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
VALUES ASNCDC.ASNCDCSERVICES('status','asncdc');
CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS');
CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS1');
CALL ASNCDC.ADDTABLE('DB2INST1', 'PRODUCTS2');
CALL ASNCDC.ADDTABLE('DB2INST1', 'FULL_TYPES');
VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');

@ -0,0 +1,26 @@
################################################################################
# Copyright 2022 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=OFF
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_OUT
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n

@ -41,6 +41,7 @@ under the License.
<module>flink-connector-oceanbase-cdc</module>
<module>flink-connector-sqlserver-cdc</module>
<module>flink-connector-tidb-cdc</module>
<module>flink-connector-db2-cdc</module>
<module>flink-sql-connector-mysql-cdc</module>
<module>flink-sql-connector-postgres-cdc</module>
<module>flink-sql-connector-mongodb-cdc</module>
@ -267,6 +268,7 @@ under the License.
<!-- Tests -->
<exclude>**/*.txt</exclude>
<exclude>flink-connector-mysql-cdc/src/test/resources/file/*.json</exclude>
<exclude>flink-connector-db2-cdc/src/test/resources/db2_server/Dockerfile</exclude>
</excludes>
</configuration>
</plugin>

Loading…
Cancel
Save