[oracle] Add oracle cdc connector

This commit does not contain secrets
pull/512/head
jpatel 4 years ago committed by Leonard Xu
parent 0ea7a9b1c7
commit 3ea40efcc2

@ -34,12 +34,12 @@ under the License.
<dependency> <dependency>
<groupId>io.debezium</groupId> <groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId> <artifactId>debezium-api</artifactId>
<version>${debezium.version}</version> <version>1.5.0.Final</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.debezium</groupId> <groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId> <artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version> <version>1.5.0.Final</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<artifactId>kafka-log4j-appender</artifactId> <artifactId>kafka-log4j-appender</artifactId>

@ -0,0 +1,171 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.alibaba.ververica</groupId>
<version>1.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-connector-oracle-cdc</artifactId>
<name>flink-connector-oracle-cdc</name>
<packaging>jar</packaging>
<dependencies>
<!-- Debezium dependencies -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>kafka-log4j-appender</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>1.5.0.Final</version>
</dependency>
<!-- test dependencies on Debezium -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-test-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>1.5.0.Final</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- test dependencies on Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.oracle.ojdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- test dependencies on TestContainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>oracle-xe</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<!-- tests will have log4j as the default logging framework available -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
<argLine>-Xms256m -Xmx2048m -Dlog4j.configurationFile=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle;
import com.alibaba.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffset;
import io.debezium.connector.oracle.OracleConnector;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A builder to build a SourceFunction which can read snapshot and continue to consume log miner.
*/
public class OracleSource {
private static final String DATABASE_SERVER_NAME = "oracle_logminer";
public static <T> Builder<T> builder() {
return new Builder<>();
}
/** Builder class of {@link OracleSource}. */
public static class Builder<T> {
private int port = 1521; // default 3306 port
private String hostname;
private String database;
private String username;
private String password;
private String serverTimeZone;
private String[] tableList;
private String[] schemaList;
private Properties dbzProperties;
private StartupOptions startupOptions = StartupOptions.initial();
private DebeziumDeserializationSchema<T> deserializer;
public Builder<T> hostname(String hostname) {
this.hostname = hostname;
return this;
}
/** Integer port number of the Oracle database server. */
public Builder<T> port(int port) {
this.port = port;
return this;
}
/**
* An optional list of regular expressions that match database names to be monitored; any
* database name not included in the whitelist will be excluded from monitoring. By default
* all databases will be monitored.
*/
public Builder<T> database(String database) {
this.database = database;
return this;
}
/**
* An optional list of regular expressions that match fully-qualified table identifiers for
* tables to be monitored; any table not included in the list will be excluded from
* monitoring. Each identifier is of the form databaseName.tableName. By default the
* connector will monitor every non-system table in each monitored database.
*/
public Builder<T> tableList(String... tableList) {
this.tableList = tableList;
return this;
}
/**
* An optional list of regular expressions that match schema names to be monitored; any
* schema name not included in the whitelist will be excluded from monitoring. By default
* all non-system schemas will be monitored.
*/
public Builder<T> schemaList(String... schemaList) {
this.schemaList = schemaList;
return this;
}
/** Name of the Oracle database to use when connecting to the Oracle database server. */
public Builder<T> username(String username) {
this.username = username;
return this;
}
/** Password to use when connecting to the Oracle database server. */
public Builder<T> password(String password) {
this.password = password;
return this;
}
/**
* The session time zone in database server, e.g. "America/Los_Angeles". It controls how the
* TIMESTAMP type in Oracle converted to STRING. See more
* https://debezium.io/documentation/reference/1.5/connectors/oracle.html#oracle-temporal-types
*/
public Builder<T> serverTimeZone(String timeZone) {
this.serverTimeZone = timeZone;
return this;
}
/** The Debezium Oracle connector properties. For example, "snapshot.mode". */
public Builder<T> debeziumProperties(Properties properties) {
this.dbzProperties = properties;
return this;
}
/**
* The deserializer used to convert from consumed {@link
* org.apache.kafka.connect.source.SourceRecord}.
*/
public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
this.deserializer = deserializer;
return this;
}
/** Specifies the startup options. */
public Builder<T> startupOptions(StartupOptions startupOptions) {
this.startupOptions = startupOptions;
return this;
}
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", OracleConnector.class.getCanonicalName());
// Logical name that identifies and provides a namespace for the particular Oracle
// database server being
// monitored. The logical name should be unique across all other connectors, since it is
// used as a prefix
// for all Kafka topic names emanating from this connector. Only alphanumeric characters
// and
// underscores should be used.
props.setProperty("database.server.name", DATABASE_SERVER_NAME);
props.setProperty("database.hostname", checkNotNull(hostname));
props.setProperty("database.user", checkNotNull(username));
props.setProperty("database.password", checkNotNull(password));
props.setProperty("database.port", String.valueOf(port));
props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
props.setProperty("database.dbname", checkNotNull(database));
if (schemaList != null) {
props.setProperty("schema.whitelist", String.join(",", schemaList));
}
if (tableList != null) {
props.setProperty("table.include.list", String.join(",", tableList));
}
if (serverTimeZone != null) {
props.setProperty("database.serverTimezone", serverTimeZone);
}
DebeziumOffset specificOffset = null;
switch (startupOptions.startupMode) {
case INITIAL:
props.setProperty("snapshot.mode", "initial");
break;
case LATEST_OFFSET:
props.setProperty("snapshot.mode", "schema_only");
break;
default:
throw new UnsupportedOperationException();
}
if (dbzProperties != null) {
dbzProperties.forEach(props::put);
}
return new DebeziumSourceFunction<>(deserializer, props, specificOffset);
}
}
}

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.alibaba.ververica.cdc.connectors.oracle.OracleSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import java.time.ZoneId;
import java.util.Objects;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link DynamicTableSource} that describes how to create a Oracle binlog from a logical
* description.
*/
public class OracleTableSource implements ScanTableSource {
private final TableSchema physicalSchema;
private final int port;
private final String hostname;
private final String database;
private final String username;
private final String password;
private final String tableName;
private final String schemaName;
private final ZoneId serverTimeZone;
private final Properties dbzProperties;
private final StartupOptions startupOptions;
public OracleTableSource(
TableSchema physicalSchema,
int port,
String hostname,
String database,
String tableName,
String schemaName,
String username,
String password,
ZoneId serverTimeZone,
Properties dbzProperties,
StartupOptions startupOptions) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
this.database = checkNotNull(database);
this.tableName = checkNotNull(tableName);
this.schemaName = checkNotNull(schemaName);
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.serverTimeZone = serverTimeZone;
this.dbzProperties = dbzProperties;
this.startupOptions = startupOptions;
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer =
new RowDataDebeziumDeserializeSchema(
rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
OracleSource.Builder<RowData> builder =
OracleSource.<RowData>builder()
.hostname(hostname)
.port(port)
.database(database)
.tableList(schemaName + "." + tableName)
.schemaList(schemaName)
.username(username)
.password(password)
.serverTimeZone(serverTimeZone.toString())
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
@Override
public DynamicTableSource copy() {
return new OracleTableSource(
physicalSchema,
port,
hostname,
database,
tableName,
schemaName,
username,
password,
serverTimeZone,
dbzProperties,
startupOptions);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OracleTableSource that = (OracleTableSource) o;
return port == that.port
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(schemaName, that.schemaName)
&& Objects.equals(serverTimeZone, that.serverTimeZone)
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(startupOptions, that.startupOptions);
}
@Override
public int hashCode() {
return Objects.hash(
physicalSchema,
port,
hostname,
database,
username,
password,
tableName,
schemaName,
serverTimeZone,
dbzProperties,
startupOptions);
}
@Override
public String asSummaryString() {
return "Oracle-CDC";
}
}

@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import com.alibaba.ververica.cdc.debezium.table.DebeziumOptions;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;
import static com.alibaba.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
/**
* Factory for creating configured instance of {@link
* com.alibaba.ververica.cdc.connectors.oracle.table.OracleTableSource}.
*/
public class OracleTableSourceFactory implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "oracle-cdc";
private static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
.withDescription("IP address or hostname of the Oracle database server.");
private static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(1521)
.withDescription("Integer port number of the Oracle database server.");
private static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription(
"Name of the Oracle database to use when connecting to the Oracle database server.");
private static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription(
"Password to use when connecting to the oracle database server.");
private static final ConfigOption<String> DATABASE_NAME =
ConfigOptions.key("database-name")
.stringType()
.noDefaultValue()
.withDescription("Database name of the Oracle server to monitor.");
private static final ConfigOption<String> SCHEMA_NAME =
ConfigOptions.key("schema-name")
.stringType()
.noDefaultValue()
.withDescription("Schema name of the Oracle database to monitor.");
private static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("Table name of the Oracle database to monitor.");
private static final ConfigOption<String> SERVER_TIME_ZONE =
ConfigOptions.key("server-time-zone")
.stringType()
.defaultValue("UTC")
.withDescription("The session time zone in database server.");
public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.stringType()
.defaultValue("initial")
.withDescription(
"Optional startup mode for Oracle CDC consumer, valid enumerations are "
+ "\"initial\", \"latest-offset\"");
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
String schemaName = config.get(SCHEMA_NAME);
int port = config.get(PORT);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
StartupOptions startupOptions = getStartupOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new OracleTableSource(
physicalSchema,
port,
hostname,
databaseName,
tableName,
schemaName,
username,
password,
serverTimeZone,
getDebeziumProperties(context.getCatalogTable().getOptions()),
startupOptions);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(USERNAME);
options.add(PASSWORD);
options.add(DATABASE_NAME);
options.add(TABLE_NAME);
options.add(SCHEMA_NAME);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
options.add(SERVER_TIME_ZONE);
options.add(SCAN_STARTUP_MODE);
return options;
}
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static StartupOptions getStartupOptions(ReadableConfig config) {
String modeString = config.get(SCAN_STARTUP_MODE);
switch (modeString.toLowerCase()) {
case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
default:
throw new ValidationException(
String.format(
"Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_LATEST,
modeString));
}
}
}

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle.table;
/**
* Startup modes for the Oracle CDC Consumer.
*
* @see StartupOptions
*/
public enum StartupMode {
INITIAL,
LATEST_OFFSET,
}

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle.table;
import java.util.Objects;
/** Debezium startup options. */
public final class StartupOptions {
public final StartupMode startupMode;
/**
* Performs an initial snapshot on the monitored database tables upon first startup, and
* continue to read the latest logminer.
*/
public static StartupOptions initial() {
return new StartupOptions(StartupMode.INITIAL);
}
/**
* Never to perform snapshot on the monitored database tables upon first startup, just read from
* the end of the logminer which means only have the changes since the connector was started.
*/
public static StartupOptions latest() {
return new StartupOptions(StartupMode.LATEST_OFFSET);
}
private StartupOptions(StartupMode startupMode) {
this.startupMode = startupMode;
switch (startupMode) {
case INITIAL:
case LATEST_OFFSET:
break;
default:
throw new UnsupportedOperationException(startupMode + " mode is not supported.");
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StartupOptions that = (StartupOptions) o;
return startupMode == that.startupMode;
}
@Override
public int hashCode() {
return Objects.hash(startupMode);
}
}

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.alibaba.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory

@ -0,0 +1,894 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.alibaba.ververica.cdc.connectors.oracle.utils.UniqueDatabase;
import com.alibaba.ververica.cdc.connectors.utils.TestSourceContext;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.jayway.jsonpath.JsonPath;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.containers.Container;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/** Tests for {@link OracleSource} which also heavily tests {@link DebeziumSourceFunction}. */
public class OracleSourceTest extends OracleTestBase {
private final UniqueDatabase database =
new UniqueDatabase(ORACLE_CONTAINER, "debezium", "system", "oracle", "inventory_1");
@Before
public void before() throws Exception {
Container.ExecResult execResult1 =
ORACLE_CONTAINER.execInContainer("chmod", "+x", "/etc/logminer_conf.sh");
execResult1.getStdout();
execResult1.getStderr();
// Container.ExecResult execResult12 = ORACLE_CONTAINER.execInContainer("chmod", "-R",
// "777", "/u01/app/oracle/");
// execResult12 = ORACLE_CONTAINER.execInContainer("su - oracle");
// execResult12.getStdout();
// execResult12.getStderr();
Container.ExecResult execResult =
ORACLE_CONTAINER.execInContainer("/bin/sh", "-c", "/etc/logminer_conf.sh");
execResult.getStdout();
execResult.getStderr();
// database.createAndInitialize();
}
@Test
public void testConsumingAllEvents() throws Exception {
DebeziumSourceFunction<SourceRecord> source = createOracleLogminerSource();
TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
setupSource(source);
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
// start the source
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
List<SourceRecord> records = drain(sourceContext, 9);
assertEquals(9, records.size());
for (int i = 0; i < records.size(); i++) {
// assertInsert(records.get(i), "ID", 101 + i);
}
statement.execute(
"INSERT INTO debezium.products VALUES (110,'robot','Toy robot',1.304)"); // 110
records = drain(sourceContext, 1);
// assertInsert(records.get(0), "id", 110);
statement.execute(
"INSERT INTO debezium.products VALUES (1001,'roy','old robot',1234.56)"); // 1001
records = drain(sourceContext, 1);
// assertInsert(records.get(0), "id", 1001);
// ---------------------------------------------------------------------------------------------------------------
// Changing the primary key of a row should result in 2 events: INSERT, DELETE
// (TOMBSTONE is dropped)
// ---------------------------------------------------------------------------------------------------------------
statement.execute(
"UPDATE debezium.products SET id=2001, description='really old robot' WHERE id=1001");
records = drain(sourceContext, 2);
// assertDelete(records.get(0), "id", 1001);
// assertInsert(records.get(1), "id", 2001);
// ---------------------------------------------------------------------------------------------------------------
// Simple UPDATE (with no schema changes)
// ---------------------------------------------------------------------------------------------------------------
statement.execute("UPDATE debezium.products SET weight=1345.67 WHERE id=2001");
records = drain(sourceContext, 1);
// assertUpdate(records.get(0), "id", 2001);
// ---------------------------------------------------------------------------------------------------------------
// Change our schema with a fully-qualified name; we should still see this event
// ---------------------------------------------------------------------------------------------------------------
// Add a column with default to the 'products' table and explicitly update one record
// ...
statement.execute(
String.format("ALTER TABLE %s.products ADD volume FLOAT", "debezium"));
statement.execute("UPDATE debezium.products SET volume=13.5 WHERE id=2001");
records = drain(sourceContext, 1);
// assertUpdate(records.get(0), "id", 2001);
// cleanup
source.cancel();
source.close();
runThread.sync();
}
}
@Test
public void testCheckpointAndRestore() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
int prevPos = 0;
{
// ---------------------------------------------------------------------------
// Step-1: start the source from empty state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source = createOracleLogminerSource();
// we use blocking context to block the source to emit before last snapshot record
final BlockingSourceContext<SourceRecord> sourceContext =
new BlockingSourceContext<>(8);
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until consumer is started
int received = drain(sourceContext, 2).size();
assertEquals(2, received);
// we can't perform checkpoint during DB snapshot
assertFalse(
waitForCheckpointLock(
sourceContext.getCheckpointLock(), Duration.ofSeconds(3)));
// unblock the source context to continue the processing
sourceContext.blocker.release();
// wait until the source finishes the database snapshot
List<SourceRecord> records = drain(sourceContext, 9 - received);
assertEquals(9, records.size() + received);
// state is still empty
assertEquals(0, offsetState.list.size());
assertEquals(0, historyState.list.size());
// ---------------------------------------------------------------------------
// Step-2: trigger checkpoint-1 after snapshot finished
// ---------------------------------------------------------------------------
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
assertHistoryState(historyState);
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server"));
// assertEquals("mysql-bin.000003", JsonPath.read(state, "$.sourceOffset.file"));
assertFalse(state.contains("row"));
assertFalse(state.contains("server_id"));
assertFalse(state.contains("event"));
// int pos = JsonPath.read(state, "$.sourceOffset.pos");
// assertTrue(pos > prevPos);
// prevPos = pos;
source.cancel();
source.close();
runThread.sync();
}
{
// ---------------------------------------------------------------------------
// Step-3: restore the source from state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source2 = createOracleLogminerSource();
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
setupSource(source2, true, offsetState, historyState, true, 0, 1);
final CheckedThread runThread2 =
new CheckedThread() {
@Override
public void go() throws Exception {
source2.run(sourceContext2);
}
};
runThread2.start();
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2));
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO debezium.products VALUES (110,'robot','Toy robot',1.304)"); // 110
List<SourceRecord> records = drain(sourceContext2, 1);
assertEquals(1, records.size());
/// assertInsert(records.get(0), "id", 110);
// ---------------------------------------------------------------------------
// Step-4: trigger checkpoint-2 during DML operations
// ---------------------------------------------------------------------------
synchronized (sourceContext2.getCheckpointLock()) {
// trigger checkpoint-1
source2.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
}
assertHistoryState(historyState); // assert the DDL is stored in the history state
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server"));
// execute 2 more DMLs to have more binlog
statement.execute(
"INSERT INTO debezium.products VALUES (1001,'roy','old robot',1234.56)"); // 1001
statement.execute("UPDATE debezium.products SET weight=1345.67 WHERE id=1001");
}
// cancel the source
source2.cancel();
source2.close();
runThread2.sync();
}
{
// ---------------------------------------------------------------------------
// Step-5: restore the source from checkpoint-2
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source3 = createOracleLogminerSource();
final TestSourceContext<SourceRecord> sourceContext3 = new TestSourceContext<>();
setupSource(source3, true, offsetState, historyState, true, 0, 1);
// restart the source
final CheckedThread runThread3 =
new CheckedThread() {
@Override
public void go() throws Exception {
source3.run(sourceContext3);
}
};
runThread3.start();
// consume the unconsumed binlog
List<SourceRecord> records = drain(sourceContext3, 2);
// assertInsert(records.get(0), "id", 1001);
// assertUpdate(records.get(1), "id", 1001);
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext3));
// can continue to receive new events
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("DELETE FROM debezium.products WHERE id=1001");
}
records = drain(sourceContext3, 1);
// assertDelete(records.get(0), "id", 1001);
// ---------------------------------------------------------------------------
// Step-6: trigger checkpoint-2 to make sure we can continue to to further checkpoints
// ---------------------------------------------------------------------------
synchronized (sourceContext3.getCheckpointLock()) {
// checkpoint 3
source3.snapshotState(new StateSnapshotContextSynchronousImpl(233, 233));
}
assertHistoryState(historyState); // assert the DDL is stored in the history state
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server"));
source3.cancel();
source3.close();
runThread3.sync();
}
{
// ---------------------------------------------------------------------------
// Step-7: restore the source from checkpoint-3
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source4 = createOracleLogminerSource();
final TestSourceContext<SourceRecord> sourceContext4 = new TestSourceContext<>();
setupSource(source4, true, offsetState, historyState, true, 0, 1);
// restart the source
final CheckedThread runThread4 =
new CheckedThread() {
@Override
public void go() throws Exception {
source4.run(sourceContext4);
}
};
runThread4.start();
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext4));
// ---------------------------------------------------------------------------
// Step-8: trigger checkpoint-3 to make sure we can continue to to further checkpoints
// ---------------------------------------------------------------------------
synchronized (sourceContext4.getCheckpointLock()) {
// checkpoint 4
source4.snapshotState(new StateSnapshotContextSynchronousImpl(254, 254));
}
assertHistoryState(historyState); // assert the DDL is stored in the history state
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server"));
source4.cancel();
source4.close();
runThread4.sync();
}
}
@Test
public void testRecoverFromRenameOperation() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
{
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
// Step-1: start the source from empty state
final DebeziumSourceFunction<SourceRecord> source = createOracleLogminerSource();
final TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until the source finishes the database snapshot
List<SourceRecord> records = drain(sourceContext, 9);
assertEquals(9, records.size());
// state is still empty
assertEquals(0, offsetState.list.size());
assertEquals(0, historyState.list.size());
// create temporary tables which are not in the whitelist
statement.execute(
"CREATE TABLE debezium.tp_001_ogt_products as (select * from debezium.products WHERE 1=2)");
// do some renames
statement.execute("ALTER TABLE DEBEZIUM.PRODUCTS RENAME TO tp_001_del_products");
statement.execute("ALTER TABLE debezium.tp_001_ogt_products RENAME TO PRODUCTS");
statement.execute(
"INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (110,'robot','Toy robot',1.304)"); // 110
statement.execute(
"INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (111,'stream train','Town stream train',1.304)"); // 111
statement.execute(
"INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (112,'cargo train','City cargo train',1.304)"); // 112
int received = drain(sourceContext, 3).size();
assertEquals(3, received);
// Step-2: trigger a checkpoint
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
assertTrue(historyState.list.size() > 0);
assertTrue(offsetState.list.size() > 0);
source.cancel();
source.close();
runThread.sync();
}
}
{
// Step-3: restore the source from state
final DebeziumSourceFunction<SourceRecord> source2 = createOracleLogminerSource();
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
setupSource(source2, true, offsetState, historyState, true, 0, 1);
final CheckedThread runThread2 =
new CheckedThread() {
@Override
public void go() throws Exception {
source2.run(sourceContext2);
}
};
runThread2.start();
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2));
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (113,'Airplane','Toy airplane',1.304)"); // 113
List<SourceRecord> records = drain(sourceContext2, 1);
assertEquals(1, records.size());
// assertInsert(records.get(0), "id", 113);
source2.cancel();
source2.close();
runThread2.sync();
}
}
}
@Test
public void testConsumingEmptyTable() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
int prevPos = 0;
{
// ---------------------------------------------------------------------------
// Step-1: start the source from empty state
// ---------------------------------------------------------------------------
DebeziumSourceFunction<SourceRecord> source =
basicSourceBuilder()
.tableList(database.getDatabaseName() + "." + "category")
.build();
// we use blocking context to block the source to emit before last snapshot record
final BlockingSourceContext<SourceRecord> sourceContext =
new BlockingSourceContext<>(8);
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until Debezium is started
while (!source.getDebeziumStarted()) {
Thread.sleep(100);
}
// ---------------------------------------------------------------------------
// Step-2: trigger checkpoint-1
// ---------------------------------------------------------------------------
synchronized (sourceContext.getCheckpointLock()) {
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
// state is still empty
assertEquals(0, offsetState.list.size());
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext));
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("INSERT INTO debezium.category VALUES (1, 'book')");
statement.execute("INSERT INTO debezium.category VALUES (2, 'shoes')");
statement.execute("UPDATE debezium.category SET category_name='books' WHERE id=1");
List<SourceRecord> records = drain(sourceContext, 3);
assertEquals(3, records.size());
// assertInsert(records.get(0), "id", 1);
// assertInsert(records.get(1), "id", 2);
// assertUpdate(records.get(2), "id", 1);
// ---------------------------------------------------------------------------
// Step-4: trigger checkpoint-2 during DML operations
// ---------------------------------------------------------------------------
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
}
assertHistoryState(historyState); // assert the DDL is stored in the history state
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server"));
}
source.cancel();
source.close();
runThread.sync();
}
}
private void assertHistoryState(TestingListState<String> historyState) {
// assert the DDL is stored in the history state
assertTrue(historyState.list.size() > 0);
boolean hasDDL =
historyState.list.stream()
.skip(1)
.anyMatch(
history ->
JsonPath.read(history, "$.source.server")
.equals("oracle_logminer")
&& JsonPath.read(history, "$.position.snapshot")
.toString()
.equals("true")
&& JsonPath.read(history, "$.ddl")
.toString()
.contains("CREATE TABLE \"DEBEZIUM\"."));
assertTrue(hasDDL);
}
// ------------------------------------------------------------------------------------------
// Public Utilities
// ------------------------------------------------------------------------------------------
/** Gets the latest offset of current MySQL server. */
public static Tuple2<String, Integer> currentMySQLLatestOffset(
UniqueDatabase database, String table, int expectedRecordCount) throws Exception {
DebeziumSourceFunction<SourceRecord> source =
OracleSource.<SourceRecord>builder()
.hostname(ORACLE_CONTAINER.getHost())
.port(ORACLE_CONTAINER.getOraclePort())
.database(database.getDatabaseName())
.tableList(database.getDatabaseName() + "." + table)
.username(ORACLE_CONTAINER.getUsername())
.password(ORACLE_CONTAINER.getPassword())
.deserializer(new OracleSourceTest.ForwardDeserializeSchema())
.build();
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
// ---------------------------------------------------------------------------
// Step-1: start source
// ---------------------------------------------------------------------------
TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
drain(sourceContext, expectedRecordCount);
// ---------------------------------------------------------------------------
// Step-2: trigger checkpoint-1 after snapshot finished
// ---------------------------------------------------------------------------
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
String offsetFile = JsonPath.read(state, "$.sourceOffset.file");
int offsetPos = JsonPath.read(state, "$.sourceOffset.pos");
source.cancel();
source.close();
runThread.sync();
return Tuple2.of(offsetFile, offsetPos);
}
// ------------------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------------------
private DebeziumSourceFunction<SourceRecord> createOracleLogminerSource() {
return basicSourceBuilder().build();
}
private OracleSource.Builder<SourceRecord> basicSourceBuilder() {
return OracleSource.<SourceRecord>builder()
.hostname(ORACLE_CONTAINER.getHost())
.port(ORACLE_CONTAINER.getOraclePort())
.database("XE")
.tableList(
database.getDatabaseName() + "." + "products") // monitor table "products"
.username(ORACLE_CONTAINER.getUsername())
.password(ORACLE_CONTAINER.getPassword())
.deserializer(new ForwardDeserializeSchema());
}
private static <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount)
throws Exception {
List<T> allRecords = new ArrayList<>();
LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs();
while (allRecords.size() < expectedRecordCount) {
StreamRecord<T> record = queue.poll(100, TimeUnit.SECONDS);
if (record != null) {
allRecords.add(record.getValue());
} else {
throw new RuntimeException(
"Can't receive " + expectedRecordCount + " elements before timeout.");
}
}
return allRecords;
}
private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout)
throws Exception {
final Semaphore semaphore = new Semaphore(0);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(
() -> {
synchronized (checkpointLock) {
semaphore.release();
}
});
boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS);
executor.shutdownNow();
return result;
}
/**
* Wait for a maximum amount of time until the first record is available.
*
* @param timeout the maximum amount of time to wait; must not be negative
* @return {@code true} if records are available, or {@code false} if the timeout occurred and
* no records are available
*/
private boolean waitForAvailableRecords(Duration timeout, TestSourceContext<?> sourceContext)
throws InterruptedException {
long now = System.currentTimeMillis();
long stop = now + timeout.toMillis();
while (System.currentTimeMillis() < stop) {
if (!sourceContext.getCollectedOutputs().isEmpty()) {
break;
}
Thread.sleep(10); // save CPU
}
return !sourceContext.getCollectedOutputs().isEmpty();
}
private static <T> void setupSource(DebeziumSourceFunction<T> source) throws Exception {
setupSource(
source, false, null, null,
true, // enable checkpointing; auto commit should be ignored
0, 1);
}
private static <T, S1, S2> void setupSource(
DebeziumSourceFunction<T> source,
boolean isRestored,
ListState<S1> restoredOffsetState,
ListState<S2> restoredHistoryState,
boolean isCheckpointingEnabled,
int subtaskIndex,
int totalNumSubtasks)
throws Exception {
// run setup procedure in operator life cycle
source.setRuntimeContext(
new MockStreamingRuntimeContext(
isCheckpointingEnabled, totalNumSubtasks, subtaskIndex));
source.initializeState(
new MockFunctionInitializationContext(
isRestored,
new MockOperatorStateStore(restoredOffsetState, restoredHistoryState)));
source.open(new Configuration());
}
/**
* A simple implementation of {@link DebeziumDeserializationSchema} which just forward the
* {@link SourceRecord}.
*/
public static class ForwardDeserializeSchema
implements DebeziumDeserializationSchema<SourceRecord> {
private static final long serialVersionUID = 2975058057832211228L;
@Override
public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
out.collect(record);
}
@Override
public TypeInformation<SourceRecord> getProducedType() {
return TypeInformation.of(SourceRecord.class);
}
}
private static class MockOperatorStateStore implements OperatorStateStore {
private final ListState<?> restoredOffsetListState;
private final ListState<?> restoredHistoryListState;
private MockOperatorStateStore(
ListState<?> restoredOffsetListState, ListState<?> restoredHistoryListState) {
this.restoredOffsetListState = restoredOffsetListState;
this.restoredHistoryListState = restoredHistoryListState;
}
@Override
@SuppressWarnings("unchecked")
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor)
throws Exception {
if (stateDescriptor.getName().equals(DebeziumSourceFunction.OFFSETS_STATE_NAME)) {
return (ListState<S>) restoredOffsetListState;
} else if (stateDescriptor
.getName()
.equals(DebeziumSourceFunction.HISTORY_RECORDS_STATE_NAME)) {
return (ListState<S>) restoredHistoryListState;
} else {
throw new IllegalStateException("Unknown state.");
}
}
@Override
public <K, V> BroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor)
throws Exception {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getRegisteredStateNames() {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getRegisteredBroadcastStateNames() {
throw new UnsupportedOperationException();
}
}
private static class MockFunctionInitializationContext
implements FunctionInitializationContext {
private final boolean isRestored;
private final OperatorStateStore operatorStateStore;
private MockFunctionInitializationContext(
boolean isRestored, OperatorStateStore operatorStateStore) {
this.isRestored = isRestored;
this.operatorStateStore = operatorStateStore;
}
@Override
public boolean isRestored() {
return isRestored;
}
@Override
public OperatorStateStore getOperatorStateStore() {
return operatorStateStore;
}
@Override
public KeyedStateStore getKeyedStateStore() {
throw new UnsupportedOperationException();
}
}
private static class BlockingSourceContext<T> extends TestSourceContext<T> {
private final Semaphore blocker = new Semaphore(0);
private final int expectedCount;
private int currentCount = 0;
private BlockingSourceContext(int expectedCount) {
this.expectedCount = expectedCount;
}
@Override
public void collect(T t) {
super.collect(t);
currentCount++;
if (currentCount == expectedCount) {
try {
// block the source to emit records
blocker.acquire();
} catch (InterruptedException e) {
// ignore
}
}
}
}
private static final class TestingListState<T> implements ListState<T> {
private final List<T> list = new ArrayList<>();
private boolean clearCalled = false;
@Override
public void clear() {
list.clear();
clearCalled = true;
}
@Override
public Iterable<T> get() throws Exception {
return list;
}
@Override
public void add(T value) throws Exception {
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
list.add(value);
}
public List<T> getList() {
return list;
}
boolean isClearCalled() {
return clearCalled;
}
@Override
public void update(List<T> values) throws Exception {
clear();
addAll(values);
}
@Override
public void addAll(List<T> values) throws Exception {
if (values != null) {
values.forEach(
v -> Preconditions.checkNotNull(v, "You cannot add null to a ListState."));
list.addAll(values);
}
}
}
}

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.OracleContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.regex.Pattern;
import java.util.stream.Stream;
/**
* Basic class for testing Oracle logminer source, this contains a Oracle container which enables
* logminer.
*/
public abstract class OracleTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(OracleTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private static final DockerImageName OC_IMAGE =
DockerImageName.parse("thake/oracle-xe-11g").asCompatibleSubstituteFor("oracle");
protected static final OracleContainer ORACLE_CONTAINER =
new OracleContainer(OC_IMAGE)
.withClasspathResourceMapping(
"docker/setup.sh", "/etc/logminer_conf.sh", BindMode.READ_WRITE)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(ORACLE_CONTAINER)).join();
LOG.info("Containers are started.");
}
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
ORACLE_CONTAINER.getJdbcUrl(),
ORACLE_CONTAINER.getUsername(),
ORACLE_CONTAINER.getPassword());
}
}

@ -0,0 +1,405 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import com.alibaba.ververica.cdc.connectors.oracle.OracleTestBase;
import com.alibaba.ververica.cdc.connectors.oracle.utils.UniqueDatabase;
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.containers.Container;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/** Integration tests for MySQL binlog SQL source. */
public class OracleConnectorITCase extends OracleTestBase {
private final UniqueDatabase database =
new UniqueDatabase(ORACLE_CONTAINER, "debezium", "system", "oracle", "inventory_1");
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
@Before
public void before() throws Exception {
TestValuesTableFactory.clearAllData();
Container.ExecResult execResult1 =
ORACLE_CONTAINER.execInContainer("chmod", "+x", "/etc/logminer_conf.sh");
execResult1.getStdout();
execResult1.getStderr();
// Container.ExecResult execResult12 = ORACLE_CONTAINER.execInContainer("chmod", "-R",
// "777", "/u01/app/oracle/");
// execResult12 = ORACLE_CONTAINER.execInContainer("su - oracle");
// execResult12.getStdout();
// execResult12.getStderr();
Container.ExecResult execResult =
ORACLE_CONTAINER.execInContainer("/bin/sh", "-c", "/etc/logminer_conf.sh");
execResult.getStdout();
execResult.getStderr();
env.setParallelism(1);
}
@Test
public void testConsumingAllEvents()
throws SQLException, ExecutionException, InterruptedException {
// inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " ID INT NOT NULL,"
+ " NAME STRING,"
+ " DESCRIPTION STRING,"
+ " WEIGHT DECIMAL(10,3)"
+ ") WITH ("
+ " 'connector' = 'oracle-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = 'XE',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
"dbzuser",
"dbz",
database.getDatabaseName(),
"products");
String sinkDDL =
"CREATE TABLE sink ("
+ " name STRING,"
+ " weightSum DECIMAL(10,3),"
+ " PRIMARY KEY (name) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result =
tEnv.executeSql(
"INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
waitForSnapshotStarted("sink");
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
statement.execute("UPDATE debezium.products SET WEIGHT='5.1' WHERE ID=107");
statement.execute(
"INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)"); // 110
statement.execute(
"INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=111");
statement.execute("UPDATE debezium.products SET WEIGHT='5.17' WHERE ID=112");
statement.execute("DELETE FROM debezium.products WHERE ID=112");
}
waitForSinkSize("sink", 20);
// The final database table looks like this:
//
// > SELECT * FROM products;
// +-----+--------------------+---------------------------------------------------------+--------+
// | id | name | description |
// weight |
// +-----+--------------------+---------------------------------------------------------+--------+
// | 101 | scooter | Small 2-wheel scooter |
// 3.14 |
// | 102 | car battery | 12V car battery |
// 8.1 |
// | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |
// 0.8 |
// | 104 | hammer | 12oz carpenter's hammer |
// 0.75 |
// | 105 | hammer | 14oz carpenter's hammer |
// 0.875 |
// | 106 | hammer | 18oz carpenter hammer |
// 1 |
// | 107 | rocks | box of assorted rocks |
// 5.1 |
// | 108 | jacket | water resistent black wind breaker |
// 0.1 |
// | 109 | spare tire | 24 inch spare tire |
// 22.2 |
// | 110 | jacket | new water resistent white wind breaker |
// 0.5 |
// +-----+--------------------+---------------------------------------------------------+--------+
String[] expected =
new String[] {
"scooter,8.310",
"car battery,8.100",
"12-pack drill bits,0.800",
"hammer,2.625",
"rocks,5.100",
"jacket,1.100",
"spare tire,22.200"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
/*
@Test
public void testAllTypes() throws Throwable {
//database.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE full_types (\n"
+ " id INT NOT NULL,\n"
+ " tiny_c TINYINT,\n"
+ " tiny_un_c SMALLINT ,\n"
+ " small_c SMALLINT,\n"
+ " small_un_c INT,\n"
+ " int_c INT ,\n"
+ " int_un_c BIGINT,\n"
+ " big_c BIGINT,\n"
+ " varchar_c STRING,\n"
+ " char_c STRING,\n"
+ " float_c FLOAT,\n"
+ " double_c DOUBLE,\n"
+ " decimal_c DECIMAL(8, 4),\n"
+ " numeric_c DECIMAL(6, 0),\n"
+ " boolean_c BOOLEAN,\n"
+ " date_c DATE,\n"
+ " time_c TIME(0),\n"
+ " datetime3_c TIMESTAMP(3),\n"
+ " datetime6_c TIMESTAMP(6),\n"
+ " timestamp_c TIMESTAMP(0),\n"
+ " file_uuid BYTES\n"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
database.getUsername(),
database.getPassword(),
database.getDatabaseName(),
"full_types");
String sinkDDL =
"CREATE TABLE sink (\n"
+ " id INT NOT NULL,\n"
+ " tiny_c TINYINT,\n"
+ " tiny_un_c SMALLINT ,\n"
+ " small_c SMALLINT,\n"
+ " small_un_c INT,\n"
+ " int_c INT ,\n"
+ " int_un_c BIGINT,\n"
+ " big_c BIGINT,\n"
+ " varchar_c STRING,\n"
+ " char_c STRING,\n"
+ " float_c FLOAT,\n"
+ " double_c DOUBLE,\n"
+ " decimal_c DECIMAL(8, 4),\n"
+ " numeric_c DECIMAL(6, 0),\n"
+ " boolean_c BOOLEAN,\n"
+ " date_c DATE,\n"
+ " time_c TIME(0),\n"
+ " datetime3_c TIMESTAMP(3),\n"
+ " datetime6_c TIMESTAMP(6),\n"
+ " timestamp_c TIMESTAMP(0),\n"
+ " file_uuid STRING\n"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result =
tEnv.executeSql(
"INSERT INTO sink SELECT id,\n"
+ "tiny_c,\n"
+ "tiny_un_c,\n"
+ "small_c,\n"
+ "small_un_c,\n"
+ "int_c,\n"
+ "int_un_c,\n"
+ "big_c,\n"
+ "varchar_c,\n"
+ "char_c,\n"
+ "float_c,\n"
+ "double_c,\n"
+ "decimal_c,\n"
+ "numeric_c,\n"
+ "boolean_c,\n"
+ "date_c,\n"
+ "time_c,\n"
+ "datetime3_c,\n"
+ "datetime6_c,\n"
+ "timestamp_c,\n"
+ "TO_BASE64(DECODE(file_uuid, 'UTF-8')) FROM full_types");
waitForSnapshotStarted("sink");
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
}
waitForSinkSize("sink", 3);
List<String> expected =
Arrays.asList(
"+I(1,127,255,32767,65535,2147483647,4294967295,9223372036854775807,Hello World,abc,"
+ "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,"
+ "2020-07-17T18:00:22.123456,2020-07-17T18:00:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)",
"-U(1,127,255,32767,65535,2147483647,4294967295,9223372036854775807,Hello World,abc,"
+ "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,"
+ "2020-07-17T18:00:22.123456,2020-07-17T18:00:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)",
"+U(1,127,255,32767,65535,2147483647,4294967295,9223372036854775807,Hello World,abc,"
+ "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,"
+ "2020-07-17T18:00:22.123456,2020-07-17T18:33:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
*/
@Test
public void testStartupFromLatestOffset() throws Exception {
// database.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " ID INT NOT NULL,"
+ " NAME STRING,"
+ " DESCRIPTION STRING,"
+ " WEIGHT DECIMAL(10,3)"
+ ") WITH ("
+ " 'connector' = 'oracle-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = 'XE',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s' ,"
+ " 'scan.startup.mode' = 'latest-offset'"
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
"dbzuser",
"dbz",
database.getDatabaseName(),
"products");
String sinkDDL =
"CREATE TABLE sink "
+ " WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
Thread.sleep(5000L);
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO debezium.products VALUES (110,'jacket','water resistent white wind breaker',0.2)"); // 110
statement.execute(
"INSERT INTO debezium.products VALUES (111,'scooter','Big 2-wheel scooter ',5.18)");
statement.execute(
"UPDATE debezium.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110");
statement.execute("UPDATE debezium.products SET weight='5.17' WHERE id=111");
statement.execute("DELETE FROM debezium.products WHERE id=111");
}
waitForSinkSize("sink", 7);
String[] expected =
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
// ------------------------------------------------------------------------------------
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);
}
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test for {@link com.alibaba.ververica.cdc.connectors.oracle.table.OracleTableSource} created by
* {@link com.alibaba.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory}.
*/
public class OracleTableSourceFactoryTest {
private static final TableSchema SCHEMA =
TableSchema.builder()
.field("aaa", DataTypes.INT().notNull())
.field("bbb", DataTypes.STRING().notNull())
.field("ccc", DataTypes.DOUBLE())
.field("ddd", DataTypes.DECIMAL(31, 18))
.field("eee", DataTypes.TIMESTAMP(3))
.primaryKey("bbb", "aaa")
.build();
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
private static final String MY_DATABASE = "myDB";
private static final String MY_TABLE = "myTable";
private static final String MY_SCHEMA = "mySchema";
private static final Properties PROPERTIES = new Properties();
@Test
public void testCommonProperties() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
OracleTableSource expectedSource =
new OracleTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
1521,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_SCHEMA,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@Test
public void testOptionalProperties() {
Map<String, String> options = getAllOptions();
options.put("port", "1521");
options.put("server-time-zone", "Asia/Shanghai");
options.put("debezium.snapshot.mode", "initial");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
dbzProperties.put("snapshot.mode", "initial");
OracleTableSource expectedSource =
new OracleTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
1521,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_SCHEMA,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("Asia/Shanghai"),
dbzProperties,
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@Test
public void testStartupFromInitial() {
Map<String, String> properties = getAllOptions();
properties.put("scan.startup.mode", "initial");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
OracleTableSource expectedSource =
new OracleTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
1521,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_SCHEMA,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
StartupOptions.initial());
assertEquals(expectedSource, actualSource);
}
@Test
public void testStartupFromLatestOffset() {
Map<String, String> properties = getAllOptions();
properties.put("scan.startup.mode", "latest-offset");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
OracleTableSource expectedSource =
new OracleTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
1521,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_SCHEMA,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
StartupOptions.latest());
assertEquals(expectedSource, actualSource);
}
@Test
public void testValidation() {
// validate illegal port
try {
Map<String, String> properties = getAllOptions();
properties.put("port", "123b");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t, "Could not parse value '123b' for key 'port'.")
.isPresent());
}
// validate missing required
Factory factory = new OracleTableSourceFactory();
for (ConfigOption<?> requiredOption : factory.requiredOptions()) {
Map<String, String> properties = getAllOptions();
properties.remove(requiredOption.key());
try {
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"Missing required options are:\n\n" + requiredOption.key())
.isPresent());
}
}
// validate unsupported option
try {
Map<String, String> properties = getAllOptions();
properties.put("unknown", "abc");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(t, "Unsupported options:\n\nunknown")
.isPresent());
}
// validate unsupported option
try {
Map<String, String> properties = getAllOptions();
properties.put("scan.startup.mode", "abc");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
String msg =
"Invalid value for option 'scan.startup.mode'. Supported values are "
+ "[initial, latest-offset], "
+ "but was: abc";
assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent());
}
}
private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "oracle-cdc");
options.put("hostname", MY_LOCALHOST);
options.put("database-name", MY_DATABASE);
options.put("table-name", MY_TABLE);
options.put("username", MY_USERNAME);
options.put("password", MY_PASSWORD);
options.put("schema-name", MY_SCHEMA);
return options;
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock source"),
new Configuration(),
OracleTableSourceFactoryTest.class.getClassLoader(),
false);
}
}

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle.utils;
import org.testcontainers.containers.OracleContainer;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
/**
* Create and populate a unique instance of a Oracle database for each run of JUnit test. A user of
* class needs to provide a logical name for Debezium and database name. It is expected that there
* is a init file in <code>src/test/resources/ddl/&lt;database_name&gt;.sql</code>. The database
* name is enriched with a unique suffix that guarantees complete isolation between runs <code>
* &lt;database_name&gt_&lt;suffix&gt</code>
*
* <p>This class is inspired from Debezium project.
*/
public class UniqueDatabase {
private static final String[] CREATE_DATABASE_DDL =
new String[] {"CREATE DATABASE $DBNAME$;", "USE $DBNAME$;"};
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private final OracleContainer container;
private final String databaseName;
private final String templateName;
private final String username;
private final String password;
private final String schemaName = "debezium";
public UniqueDatabase(
OracleContainer container,
String databaseName,
String username,
String password,
String sqlName) {
this(
container,
databaseName,
Integer.toUnsignedString(new Random().nextInt(), 36),
username,
password,
sqlName);
}
private UniqueDatabase(
OracleContainer container,
String databaseName,
final String identifier,
String username,
String password,
String sqlName) {
this.container = container;
this.databaseName = databaseName; // + "_" + identifier;
this.templateName = sqlName;
this.username = username;
this.password = password;
}
public String getDatabaseName() {
return databaseName;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
/** @return Fully qualified table name <code>&lt;databaseName&gt;.&lt;tableName&gt;</code> */
public String qualifiedTableName(final String tableName) {
return String.format("%s.%s", databaseName, tableName);
}
/** Creates the database and populates it with initialization SQL script. */
public void createAndInitialize() {
final String ddlFile = String.format("ddl/%s.sql", templateName);
final URL ddlTestFile = UniqueDatabase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try {
try (Connection connection =
DriverManager.getConnection(
container.getJdbcUrl(), username, password);
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.map(this::convertSQL)
.collect(Collectors.joining("\n"))
.split(";"))
.map(x -> x.replace("$$", ";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
}
} catch (final Exception e) {
throw new IllegalStateException(e);
}
}
public Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(container.getJdbcUrl(), username, password);
}
private String convertSQL(final String sql) {
return sql.replace("$DBNAME$", databaseName);
}
}

@ -0,0 +1,50 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: column_type_test
-- ----------------------------------------------------------------------------------------------------------------
CREATE TABLE full_types (
id INT AUTO_INCREMENT NOT NULL,
tiny_c TINYINT,
tiny_un_c TINYINT UNSIGNED,
small_c SMALLINT,
small_un_c SMALLINT UNSIGNED,
int_c INTEGER ,
int_un_c INTEGER UNSIGNED,
big_c BIGINT,
varchar_c VARCHAR(255),
char_c CHAR(3),
float_c FLOAT,
double_c DOUBLE,
decimal_c DECIMAL(8, 4),
numeric_c NUMERIC(6, 0),
boolean_c BOOLEAN,
date_c DATE,
time_c TIME(0),
datetime3_c DATETIME(3),
datetime6_c DATETIME(6),
timestamp_c TIMESTAMP,
file_uuid BINARY(16),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;
INSERT INTO full_types VALUES (
DEFAULT, 127, 255, 32767, 65535, 2147483647, 4294967295, 9223372036854775807,
'Hello World', 'abc', 123.102, 404.4443, 123.4567, 345.6, true,
'2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22',
unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400','-',''))
);

@ -0,0 +1,93 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
-- Create and populate our products using a single insert with many rows
CREATE TABLE HR.products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
ALTER TABLE HR.products AUTO_INCREMENT = 101;
INSERT INTO HR.products
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
(default,"car battery","12V car battery",8.1),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
(default,"hammer","12oz carpenter's hammer",0.75),
(default,"hammer","14oz carpenter's hammer",0.875),
(default,"hammer","16oz carpenter's hammer",1.0),
(default,"rocks","box of assorted rocks",5.3),
(default,"jacket","water resistent black wind breaker",0.1),
(default,"spare tire","24 inch spare tire",22.2);
-- Create and populate the products on hand using multiple inserts
CREATE TABLE HR.products_on_hand (
product_id INTEGER NOT NULL PRIMARY KEY,
quantity INTEGER NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);
INSERT INTO products_on_hand VALUES (101,3);
INSERT INTO products_on_hand VALUES (102,8);
INSERT INTO products_on_hand VALUES (103,18);
INSERT INTO products_on_hand VALUES (104,4);
INSERT INTO products_on_hand VALUES (105,5);
INSERT INTO products_on_hand VALUES (106,0);
INSERT INTO products_on_hand VALUES (107,44);
INSERT INTO products_on_hand VALUES (108,2);
INSERT INTO products_on_hand VALUES (109,5);
-- Create some customers ...
CREATE TABLE HR.customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
INSERT INTO HR.customers
VALUES (default,"Sally","Thomas","sally.thomas@acme.com"),
(default,"George","Bailey","gbailey@foobar.com"),
(default,"Edward","Walker","ed@walker.com"),
(default,"Anne","Kretchmar","annek@noanswer.org");
-- Create some very simple orders
CREATE TABLE HR.orders (
order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL,
FOREIGN KEY order_customer (purchaser) REFERENCES customers(id),
FOREIGN KEY ordered_product (product_id) REFERENCES products(id)
) AUTO_INCREMENT = 10001;
INSERT INTO HR.orders
VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '2016-01-17', 1002, 2, 105),
(default, '2016-02-18', 1004, 3, 109),
(default, '2016-02-19', 1002, 2, 106),
(default, '16-02-21', 1003, 1, 107);
CREATE TABLE HR.category (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
category_name VARCHAR(255)
);

@ -0,0 +1,48 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: inventory
-- ----------------------------------------------------------------------------------------------------------------
-- Create and populate our products using a single insert with many rows
CREATE TABLE HR.products (
id INTEGER NOT NULL,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT,
PRIMARY KEY(id)
);
ALTER TABLE HR.products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- Auto-generated SQL script #202106141039
INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (101,'scooter','Small 2-wheel scooter',3.14);
INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (102,'car battery','12V car battery',8.1);
INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (104,'hammer','12oz carpenters hammer',0.75);
INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (105,'hammer','14oz carpenters hammer',0.875);
INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (106,'hammer','16oz carpenters hammer',1.0);
INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (107,'rocks','box of assorted rocks',5.3);
INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (108,'jacket','water resistent black wind breaker',0.1);
INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (109,'spare tire','24 inch spare tire',22.2);

@ -0,0 +1,114 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
#!/bin/sh
mkdir /u01/app/oracle/oradata/recovery_area
# Set archive log mode and enable GG replication
ORACLE_SID=XE
export ORACLE_SID
#echo 'export PATH=$ORACLE_HOME/bin:$PATH' >> /etc/bash.bashrc
export ORACLE_HOME=/u01/app/oracle/product/11.2.0/xe
# Create Log Miner Tablespace and User
/u01/app/oracle/product/11.2.0/xe/bin/sqlplus sys/oracle@//localhost:1521/XE as sysdba <<- EOF
CREATE TABLESPACE LOGMINER_TBS DATAFILE '/u01/app/oracle/oradata/XE/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
EOF
/u01/app/oracle/product/11.2.0/xe/bin/sqlplus sys/oracle@//localhost:1521/XE as sysdba <<- EOF
CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO dbzuser;
GRANT SELECT ON V_$DATABASE TO dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;
GRANT SELECT ANY DICTIONARY TO dbzuser;
GRANT CREATE TABLE TO dbzuser;
GRANT ALTER ANY TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser;
GRANT SELECT ON V_$LOG TO dbzuser;
GRANT SELECT ON V_$LOG_HISTORY TO dbzuser;
exit;
EOF
/u01/app/oracle/product/11.2.0/xe/bin/sqlplus sys/oracle@//localhost:1521/XE as sysdba <<- EOF
CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS;
GRANT CONNECT TO debezium;
GRANT CREATE SESSION TO debezium;
GRANT CREATE TABLE TO debezium;
GRANT CREATE SEQUENCE to debezium;
ALTER USER debezium QUOTA 100M on users;
exit;
EOF
/u01/app/oracle/product/11.2.0/xe/bin/sqlplus sys/oracle@//localhost:1521/XE as sysdba <<- EOF
CREATE TABLE debezium.products (
ID INTEGER NOT NULL,
NAME VARCHAR(255) NOT NULL,
DESCRIPTION VARCHAR(512),
WEIGHT FLOAT,
PRIMARY KEY(ID)
);
CREATE TABLE debezium.category (
ID INTEGER NOT NULL,
CATEGORY_NAME VARCHAR(255),
PRIMARY KEY(ID)
);
ALTER TABLE debezium.products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE debezium.category ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (101,'scooter','Small 2-wheel scooter',3.14);
INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (102,'car battery','12V car battery',8.1);
INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (104,'hammer','12oz carpenters hammer',0.75);
INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (105,'hammer','14oz carpenters hammer',0.875);
INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (106,'hammer','16oz carpenters hammer',1.0);
INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (107,'rocks','box of assorted rocks',5.3);
INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (108,'jacket','water resistent black wind breaker',0.1);
INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT)
VALUES (109,'spare tire','24 inch spare tire',22.2);
exit;
EOF

@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-cdc-connectors</artifactId>
<groupId>com.alibaba.ververica</groupId>
<version>1.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<dependencies>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.kafka:*</artifact>
<excludes>
<exclude>kafka/kafka-version.properties</exclude>
<exclude>LICENSE</exclude>
<!-- Does not contain anything relevant.
Cites a binary dependency on jersey, but this is neither reflected in the
dependency graph, nor are any jersey files bundled. -->
<exclude>NOTICE</exclude>
<exclude>common/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.oracle;
/** This is used to generate a dummy docs jar for this module to pass OSS repository rule. */
public class DummyDocs {}

@ -0,0 +1,6 @@
flink-sql-connector-oracle-cdc
Copyright 2020 Ververica Inc.
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
- org.apache.kafka:kafka-clients:2.5.0

@ -37,10 +37,13 @@ under the License.
<module>flink-connector-test-util</module> <module>flink-connector-test-util</module>
<module>flink-connector-mysql-cdc</module> <module>flink-connector-mysql-cdc</module>
<module>flink-connector-postgres-cdc</module> <module>flink-connector-postgres-cdc</module>
<module>flink-connector-oracle-cdc</module>
<module>flink-connector-mongodb-cdc</module> <module>flink-connector-mongodb-cdc</module>
<module>flink-sql-connector-mysql-cdc</module> <module>flink-sql-connector-mysql-cdc</module>
<module>flink-sql-connector-postgres-cdc</module> <module>flink-sql-connector-postgres-cdc</module>
<module>flink-sql-connector-mongodb-cdc</module> <module>flink-sql-connector-mongodb-cdc</module>
<module>flink-sql-connector-postgres-cdc</module>
<module>flink-sql-connector-oracle-cdc</module>
<module>flink-format-changelog-json</module> <module>flink-format-changelog-json</module>
</modules> </modules>

Loading…
Cancel
Save