diff --git a/flink-connector-debezium/pom.xml b/flink-connector-debezium/pom.xml index e228e0d31..936a18378 100644 --- a/flink-connector-debezium/pom.xml +++ b/flink-connector-debezium/pom.xml @@ -34,12 +34,12 @@ under the License. io.debezium debezium-api - ${debezium.version} + 1.5.0.Final io.debezium debezium-embedded - ${debezium.version} + 1.5.0.Final kafka-log4j-appender diff --git a/flink-connector-oracle-cdc/pom.xml b/flink-connector-oracle-cdc/pom.xml new file mode 100644 index 000000000..4ca191763 --- /dev/null +++ b/flink-connector-oracle-cdc/pom.xml @@ -0,0 +1,171 @@ + + + + + flink-cdc-connectors + com.alibaba.ververica + 1.3-SNAPSHOT + + 4.0.0 + + flink-connector-oracle-cdc + flink-connector-oracle-cdc + jar + + + + + + com.alibaba.ververica + flink-connector-debezium + ${project.version} + + + kafka-log4j-appender + org.apache.kafka + + + + + + io.debezium + debezium-connector-oracle + 1.5.0.Final + + + + + + com.alibaba.ververica + flink-connector-test-util + ${project.version} + test + + + + io.debezium + debezium-core + 1.5.0.Final + test-jar + test + + + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} + ${flink.version} + test + + + com.oracle.ojdbc + ojdbc8 + 19.3.0.0 + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-core + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-tests + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} + ${flink.version} + test-jar + test + + + + + + org.testcontainers + oracle-xe + ${testcontainers.version} + test + + + + com.jayway.jsonpath + json-path + 2.4.0 + test + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + test + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + 1 + -Xms256m -Xmx2048m -Dlog4j.configurationFile=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit + + + + + + diff --git a/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/OracleSource.java b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/OracleSource.java new file mode 100644 index 000000000..f40222cc5 --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/OracleSource.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle; + +import com.alibaba.ververica.cdc.connectors.oracle.table.StartupOptions; +import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; +import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffset; +import io.debezium.connector.oracle.OracleConnector; + +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A builder to build a SourceFunction which can read snapshot and continue to consume log miner. + */ +public class OracleSource { + + private static final String DATABASE_SERVER_NAME = "oracle_logminer"; + + public static Builder builder() { + return new Builder<>(); + } + + /** Builder class of {@link OracleSource}. */ + public static class Builder { + + private int port = 1521; // default 3306 port + private String hostname; + private String database; + private String username; + private String password; + private String serverTimeZone; + private String[] tableList; + private String[] schemaList; + private Properties dbzProperties; + private StartupOptions startupOptions = StartupOptions.initial(); + private DebeziumDeserializationSchema deserializer; + + public Builder hostname(String hostname) { + this.hostname = hostname; + return this; + } + + /** Integer port number of the Oracle database server. */ + public Builder port(int port) { + this.port = port; + return this; + } + + /** + * An optional list of regular expressions that match database names to be monitored; any + * database name not included in the whitelist will be excluded from monitoring. By default + * all databases will be monitored. + */ + public Builder database(String database) { + this.database = database; + return this; + } + + /** + * An optional list of regular expressions that match fully-qualified table identifiers for + * tables to be monitored; any table not included in the list will be excluded from + * monitoring. Each identifier is of the form databaseName.tableName. By default the + * connector will monitor every non-system table in each monitored database. + */ + public Builder tableList(String... tableList) { + this.tableList = tableList; + return this; + } + + /** + * An optional list of regular expressions that match schema names to be monitored; any + * schema name not included in the whitelist will be excluded from monitoring. By default + * all non-system schemas will be monitored. + */ + public Builder schemaList(String... schemaList) { + this.schemaList = schemaList; + return this; + } + + /** Name of the Oracle database to use when connecting to the Oracle database server. */ + public Builder username(String username) { + this.username = username; + return this; + } + + /** Password to use when connecting to the Oracle database server. */ + public Builder password(String password) { + this.password = password; + return this; + } + + /** + * The session time zone in database server, e.g. "America/Los_Angeles". It controls how the + * TIMESTAMP type in Oracle converted to STRING. See more + * https://debezium.io/documentation/reference/1.5/connectors/oracle.html#oracle-temporal-types + */ + public Builder serverTimeZone(String timeZone) { + this.serverTimeZone = timeZone; + return this; + } + + /** The Debezium Oracle connector properties. For example, "snapshot.mode". */ + public Builder debeziumProperties(Properties properties) { + this.dbzProperties = properties; + return this; + } + + /** + * The deserializer used to convert from consumed {@link + * org.apache.kafka.connect.source.SourceRecord}. + */ + public Builder deserializer(DebeziumDeserializationSchema deserializer) { + this.deserializer = deserializer; + return this; + } + + /** Specifies the startup options. */ + public Builder startupOptions(StartupOptions startupOptions) { + this.startupOptions = startupOptions; + return this; + } + + public DebeziumSourceFunction build() { + Properties props = new Properties(); + props.setProperty("connector.class", OracleConnector.class.getCanonicalName()); + // Logical name that identifies and provides a namespace for the particular Oracle + // database server being + // monitored. The logical name should be unique across all other connectors, since it is + // used as a prefix + // for all Kafka topic names emanating from this connector. Only alphanumeric characters + // and + // underscores should be used. + props.setProperty("database.server.name", DATABASE_SERVER_NAME); + props.setProperty("database.hostname", checkNotNull(hostname)); + props.setProperty("database.user", checkNotNull(username)); + props.setProperty("database.password", checkNotNull(password)); + props.setProperty("database.port", String.valueOf(port)); + props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true)); + props.setProperty("database.dbname", checkNotNull(database)); + if (schemaList != null) { + props.setProperty("schema.whitelist", String.join(",", schemaList)); + } + if (tableList != null) { + props.setProperty("table.include.list", String.join(",", tableList)); + } + if (serverTimeZone != null) { + props.setProperty("database.serverTimezone", serverTimeZone); + } + + DebeziumOffset specificOffset = null; + switch (startupOptions.startupMode) { + case INITIAL: + props.setProperty("snapshot.mode", "initial"); + break; + + case LATEST_OFFSET: + props.setProperty("snapshot.mode", "schema_only"); + break; + + default: + throw new UnsupportedOperationException(); + } + + if (dbzProperties != null) { + dbzProperties.forEach(props::put); + } + + return new DebeziumSourceFunction<>(deserializer, props, specificOffset); + } + } +} diff --git a/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleTableSource.java b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleTableSource.java new file mode 100644 index 000000000..2e87728a5 --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleTableSource.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle.table; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import com.alibaba.ververica.cdc.connectors.oracle.OracleSource; +import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; +import com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; + +import java.time.ZoneId; +import java.util.Objects; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link DynamicTableSource} that describes how to create a Oracle binlog from a logical + * description. + */ +public class OracleTableSource implements ScanTableSource { + + private final TableSchema physicalSchema; + private final int port; + private final String hostname; + private final String database; + private final String username; + private final String password; + private final String tableName; + private final String schemaName; + private final ZoneId serverTimeZone; + private final Properties dbzProperties; + private final StartupOptions startupOptions; + + public OracleTableSource( + TableSchema physicalSchema, + int port, + String hostname, + String database, + String tableName, + String schemaName, + String username, + String password, + ZoneId serverTimeZone, + Properties dbzProperties, + StartupOptions startupOptions) { + this.physicalSchema = physicalSchema; + this.port = port; + this.hostname = checkNotNull(hostname); + this.database = checkNotNull(database); + this.tableName = checkNotNull(tableName); + this.schemaName = checkNotNull(schemaName); + this.username = checkNotNull(username); + this.password = checkNotNull(password); + this.serverTimeZone = serverTimeZone; + this.dbzProperties = dbzProperties; + this.startupOptions = startupOptions; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); + TypeInformation typeInfo = + scanContext.createTypeInformation(physicalSchema.toRowDataType()); + DebeziumDeserializationSchema deserializer = + new RowDataDebeziumDeserializeSchema( + rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone); + OracleSource.Builder builder = + OracleSource.builder() + .hostname(hostname) + .port(port) + .database(database) + .tableList(schemaName + "." + tableName) + .schemaList(schemaName) + .username(username) + .password(password) + .serverTimeZone(serverTimeZone.toString()) + .debeziumProperties(dbzProperties) + .startupOptions(startupOptions) + .deserializer(deserializer); + DebeziumSourceFunction sourceFunction = builder.build(); + + return SourceFunctionProvider.of(sourceFunction, false); + } + + @Override + public DynamicTableSource copy() { + return new OracleTableSource( + physicalSchema, + port, + hostname, + database, + tableName, + schemaName, + username, + password, + serverTimeZone, + dbzProperties, + startupOptions); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OracleTableSource that = (OracleTableSource) o; + return port == that.port + && Objects.equals(physicalSchema, that.physicalSchema) + && Objects.equals(hostname, that.hostname) + && Objects.equals(database, that.database) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password) + && Objects.equals(tableName, that.tableName) + && Objects.equals(schemaName, that.schemaName) + && Objects.equals(serverTimeZone, that.serverTimeZone) + && Objects.equals(dbzProperties, that.dbzProperties) + && Objects.equals(startupOptions, that.startupOptions); + } + + @Override + public int hashCode() { + return Objects.hash( + physicalSchema, + port, + hostname, + database, + username, + password, + tableName, + schemaName, + serverTimeZone, + dbzProperties, + startupOptions); + } + + @Override + public String asSummaryString() { + return "Oracle-CDC"; + } +} diff --git a/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java new file mode 100644 index 000000000..d960ade4d --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; + +import com.alibaba.ververica.cdc.debezium.table.DebeziumOptions; + +import java.time.ZoneId; +import java.util.HashSet; +import java.util.Set; + +import static com.alibaba.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; + +/** + * Factory for creating configured instance of {@link + * com.alibaba.ververica.cdc.connectors.oracle.table.OracleTableSource}. + */ +public class OracleTableSourceFactory implements DynamicTableSourceFactory { + + private static final String IDENTIFIER = "oracle-cdc"; + + private static final ConfigOption HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the Oracle database server."); + + private static final ConfigOption PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(1521) + .withDescription("Integer port number of the Oracle database server."); + + private static final ConfigOption USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the Oracle database to use when connecting to the Oracle database server."); + + private static final ConfigOption PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the oracle database server."); + + private static final ConfigOption DATABASE_NAME = + ConfigOptions.key("database-name") + .stringType() + .noDefaultValue() + .withDescription("Database name of the Oracle server to monitor."); + + private static final ConfigOption SCHEMA_NAME = + ConfigOptions.key("schema-name") + .stringType() + .noDefaultValue() + .withDescription("Schema name of the Oracle database to monitor."); + + private static final ConfigOption TABLE_NAME = + ConfigOptions.key("table-name") + .stringType() + .noDefaultValue() + .withDescription("Table name of the Oracle database to monitor."); + + private static final ConfigOption SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .defaultValue("UTC") + .withDescription("The session time zone in database server."); + + public static final ConfigOption SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for Oracle CDC consumer, valid enumerations are " + + "\"initial\", \"latest-offset\""); + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX); + + final ReadableConfig config = helper.getOptions(); + String hostname = config.get(HOSTNAME); + String username = config.get(USERNAME); + String password = config.get(PASSWORD); + String databaseName = config.get(DATABASE_NAME); + String tableName = config.get(TABLE_NAME); + String schemaName = config.get(SCHEMA_NAME); + int port = config.get(PORT); + ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); + StartupOptions startupOptions = getStartupOptions(config); + TableSchema physicalSchema = + TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + + return new OracleTableSource( + physicalSchema, + port, + hostname, + databaseName, + tableName, + schemaName, + username, + password, + serverTimeZone, + getDebeziumProperties(context.getCatalogTable().getOptions()), + startupOptions); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(HOSTNAME); + options.add(USERNAME); + options.add(PASSWORD); + options.add(DATABASE_NAME); + options.add(TABLE_NAME); + options.add(SCHEMA_NAME); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(PORT); + options.add(SERVER_TIME_ZONE); + options.add(SCAN_STARTUP_MODE); + + return options; + } + + private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; + + private static StartupOptions getStartupOptions(ReadableConfig config) { + String modeString = config.get(SCAN_STARTUP_MODE); + + switch (modeString.toLowerCase()) { + case SCAN_STARTUP_MODE_VALUE_INITIAL: + return StartupOptions.initial(); + + case SCAN_STARTUP_MODE_VALUE_LATEST: + return StartupOptions.latest(); + + default: + throw new ValidationException( + String.format( + "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s", + SCAN_STARTUP_MODE.key(), + SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_LATEST, + modeString)); + } + } +} diff --git a/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/StartupMode.java b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/StartupMode.java new file mode 100644 index 000000000..28d2447f0 --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/StartupMode.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle.table; + +/** + * Startup modes for the Oracle CDC Consumer. + * + * @see StartupOptions + */ +public enum StartupMode { + INITIAL, + LATEST_OFFSET, +} diff --git a/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/StartupOptions.java b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/StartupOptions.java new file mode 100644 index 000000000..ea317b1bc --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/table/StartupOptions.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle.table; + +import java.util.Objects; + +/** Debezium startup options. */ +public final class StartupOptions { + public final StartupMode startupMode; + + /** + * Performs an initial snapshot on the monitored database tables upon first startup, and + * continue to read the latest logminer. + */ + public static StartupOptions initial() { + return new StartupOptions(StartupMode.INITIAL); + } + + /** + * Never to perform snapshot on the monitored database tables upon first startup, just read from + * the end of the logminer which means only have the changes since the connector was started. + */ + public static StartupOptions latest() { + return new StartupOptions(StartupMode.LATEST_OFFSET); + } + + private StartupOptions(StartupMode startupMode) { + this.startupMode = startupMode; + + switch (startupMode) { + case INITIAL: + + case LATEST_OFFSET: + break; + + default: + throw new UnsupportedOperationException(startupMode + " mode is not supported."); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StartupOptions that = (StartupOptions) o; + return startupMode == that.startupMode; + } + + @Override + public int hashCode() { + return Objects.hash(startupMode); + } +} diff --git a/flink-connector-oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..9144208f7 --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.alibaba.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory diff --git a/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/OracleSourceTest.java b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/OracleSourceTest.java new file mode 100644 index 000000000..1741816db --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/OracleSourceTest.java @@ -0,0 +1,894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle; + +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import com.alibaba.ververica.cdc.connectors.oracle.utils.UniqueDatabase; +import com.alibaba.ververica.cdc.connectors.utils.TestSourceContext; +import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; +import com.jayway.jsonpath.JsonPath; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Before; +import org.junit.Test; +import org.testcontainers.containers.Container; + +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link OracleSource} which also heavily tests {@link DebeziumSourceFunction}. */ +public class OracleSourceTest extends OracleTestBase { + + private final UniqueDatabase database = + new UniqueDatabase(ORACLE_CONTAINER, "debezium", "system", "oracle", "inventory_1"); + + @Before + public void before() throws Exception { + Container.ExecResult execResult1 = + ORACLE_CONTAINER.execInContainer("chmod", "+x", "/etc/logminer_conf.sh"); + + execResult1.getStdout(); + execResult1.getStderr(); + + // Container.ExecResult execResult12 = ORACLE_CONTAINER.execInContainer("chmod", "-R", + // "777", "/u01/app/oracle/"); + // execResult12 = ORACLE_CONTAINER.execInContainer("su - oracle"); + + // execResult12.getStdout(); + // execResult12.getStderr(); + Container.ExecResult execResult = + ORACLE_CONTAINER.execInContainer("/bin/sh", "-c", "/etc/logminer_conf.sh"); + execResult.getStdout(); + execResult.getStderr(); + // database.createAndInitialize(); + + } + + @Test + public void testConsumingAllEvents() throws Exception { + DebeziumSourceFunction source = createOracleLogminerSource(); + TestSourceContext sourceContext = new TestSourceContext<>(); + + setupSource(source); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + // start the source + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + List records = drain(sourceContext, 9); + assertEquals(9, records.size()); + for (int i = 0; i < records.size(); i++) { + // assertInsert(records.get(i), "ID", 101 + i); + } + + statement.execute( + "INSERT INTO debezium.products VALUES (110,'robot','Toy robot',1.304)"); // 110 + records = drain(sourceContext, 1); + // assertInsert(records.get(0), "id", 110); + + statement.execute( + "INSERT INTO debezium.products VALUES (1001,'roy','old robot',1234.56)"); // 1001 + records = drain(sourceContext, 1); + // assertInsert(records.get(0), "id", 1001); + + // --------------------------------------------------------------------------------------------------------------- + // Changing the primary key of a row should result in 2 events: INSERT, DELETE + // (TOMBSTONE is dropped) + // --------------------------------------------------------------------------------------------------------------- + statement.execute( + "UPDATE debezium.products SET id=2001, description='really old robot' WHERE id=1001"); + records = drain(sourceContext, 2); + // assertDelete(records.get(0), "id", 1001); + // assertInsert(records.get(1), "id", 2001); + + // --------------------------------------------------------------------------------------------------------------- + // Simple UPDATE (with no schema changes) + // --------------------------------------------------------------------------------------------------------------- + statement.execute("UPDATE debezium.products SET weight=1345.67 WHERE id=2001"); + records = drain(sourceContext, 1); + // assertUpdate(records.get(0), "id", 2001); + + // --------------------------------------------------------------------------------------------------------------- + // Change our schema with a fully-qualified name; we should still see this event + // --------------------------------------------------------------------------------------------------------------- + // Add a column with default to the 'products' table and explicitly update one record + // ... + statement.execute( + String.format("ALTER TABLE %s.products ADD volume FLOAT", "debezium")); + statement.execute("UPDATE debezium.products SET volume=13.5 WHERE id=2001"); + records = drain(sourceContext, 1); + // assertUpdate(records.get(0), "id", 2001); + + // cleanup + source.cancel(); + source.close(); + runThread.sync(); + } + } + + @Test + public void testCheckpointAndRestore() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + int prevPos = 0; + { + // --------------------------------------------------------------------------- + // Step-1: start the source from empty state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source = createOracleLogminerSource(); + // we use blocking context to block the source to emit before last snapshot record + final BlockingSourceContext sourceContext = + new BlockingSourceContext<>(8); + // setup source with empty state + setupSource(source, false, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until consumer is started + int received = drain(sourceContext, 2).size(); + assertEquals(2, received); + + // we can't perform checkpoint during DB snapshot + assertFalse( + waitForCheckpointLock( + sourceContext.getCheckpointLock(), Duration.ofSeconds(3))); + + // unblock the source context to continue the processing + sourceContext.blocker.release(); + // wait until the source finishes the database snapshot + List records = drain(sourceContext, 9 - received); + assertEquals(9, records.size() + received); + + // state is still empty + assertEquals(0, offsetState.list.size()); + assertEquals(0, historyState.list.size()); + + // --------------------------------------------------------------------------- + // Step-2: trigger checkpoint-1 after snapshot finished + // --------------------------------------------------------------------------- + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + + assertHistoryState(historyState); + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server")); + // assertEquals("mysql-bin.000003", JsonPath.read(state, "$.sourceOffset.file")); + assertFalse(state.contains("row")); + assertFalse(state.contains("server_id")); + assertFalse(state.contains("event")); + // int pos = JsonPath.read(state, "$.sourceOffset.pos"); + // assertTrue(pos > prevPos); + // prevPos = pos; + + source.cancel(); + source.close(); + runThread.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-3: restore the source from state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source2 = createOracleLogminerSource(); + final TestSourceContext sourceContext2 = new TestSourceContext<>(); + setupSource(source2, true, offsetState, historyState, true, 0, 1); + final CheckedThread runThread2 = + new CheckedThread() { + @Override + public void go() throws Exception { + source2.run(sourceContext2); + } + }; + runThread2.start(); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2)); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "INSERT INTO debezium.products VALUES (110,'robot','Toy robot',1.304)"); // 110 + List records = drain(sourceContext2, 1); + assertEquals(1, records.size()); + /// assertInsert(records.get(0), "id", 110); + + // --------------------------------------------------------------------------- + // Step-4: trigger checkpoint-2 during DML operations + // --------------------------------------------------------------------------- + synchronized (sourceContext2.getCheckpointLock()) { + // trigger checkpoint-1 + source2.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138)); + } + + assertHistoryState(historyState); // assert the DDL is stored in the history state + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server")); + + // execute 2 more DMLs to have more binlog + statement.execute( + "INSERT INTO debezium.products VALUES (1001,'roy','old robot',1234.56)"); // 1001 + statement.execute("UPDATE debezium.products SET weight=1345.67 WHERE id=1001"); + } + + // cancel the source + source2.cancel(); + source2.close(); + runThread2.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-5: restore the source from checkpoint-2 + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source3 = createOracleLogminerSource(); + final TestSourceContext sourceContext3 = new TestSourceContext<>(); + setupSource(source3, true, offsetState, historyState, true, 0, 1); + + // restart the source + final CheckedThread runThread3 = + new CheckedThread() { + @Override + public void go() throws Exception { + source3.run(sourceContext3); + } + }; + runThread3.start(); + + // consume the unconsumed binlog + List records = drain(sourceContext3, 2); + // assertInsert(records.get(0), "id", 1001); + // assertUpdate(records.get(1), "id", 1001); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext3)); + + // can continue to receive new events + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("DELETE FROM debezium.products WHERE id=1001"); + } + records = drain(sourceContext3, 1); + // assertDelete(records.get(0), "id", 1001); + + // --------------------------------------------------------------------------- + // Step-6: trigger checkpoint-2 to make sure we can continue to to further checkpoints + // --------------------------------------------------------------------------- + synchronized (sourceContext3.getCheckpointLock()) { + // checkpoint 3 + source3.snapshotState(new StateSnapshotContextSynchronousImpl(233, 233)); + } + assertHistoryState(historyState); // assert the DDL is stored in the history state + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server")); + + source3.cancel(); + source3.close(); + runThread3.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-7: restore the source from checkpoint-3 + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source4 = createOracleLogminerSource(); + final TestSourceContext sourceContext4 = new TestSourceContext<>(); + setupSource(source4, true, offsetState, historyState, true, 0, 1); + + // restart the source + final CheckedThread runThread4 = + new CheckedThread() { + @Override + public void go() throws Exception { + source4.run(sourceContext4); + } + }; + runThread4.start(); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext4)); + + // --------------------------------------------------------------------------- + // Step-8: trigger checkpoint-3 to make sure we can continue to to further checkpoints + // --------------------------------------------------------------------------- + synchronized (sourceContext4.getCheckpointLock()) { + // checkpoint 4 + source4.snapshotState(new StateSnapshotContextSynchronousImpl(254, 254)); + } + assertHistoryState(historyState); // assert the DDL is stored in the history state + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server")); + + source4.cancel(); + source4.close(); + runThread4.sync(); + } + } + + @Test + public void testRecoverFromRenameOperation() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + + { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // Step-1: start the source from empty state + final DebeziumSourceFunction source = createOracleLogminerSource(); + final TestSourceContext sourceContext = new TestSourceContext<>(); + // setup source with empty state + setupSource(source, false, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until the source finishes the database snapshot + List records = drain(sourceContext, 9); + assertEquals(9, records.size()); + + // state is still empty + assertEquals(0, offsetState.list.size()); + assertEquals(0, historyState.list.size()); + + // create temporary tables which are not in the whitelist + statement.execute( + "CREATE TABLE debezium.tp_001_ogt_products as (select * from debezium.products WHERE 1=2)"); + // do some renames + statement.execute("ALTER TABLE DEBEZIUM.PRODUCTS RENAME TO tp_001_del_products"); + + statement.execute("ALTER TABLE debezium.tp_001_ogt_products RENAME TO PRODUCTS"); + statement.execute( + "INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (110,'robot','Toy robot',1.304)"); // 110 + statement.execute( + "INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (111,'stream train','Town stream train',1.304)"); // 111 + statement.execute( + "INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (112,'cargo train','City cargo train',1.304)"); // 112 + + int received = drain(sourceContext, 3).size(); + assertEquals(3, received); + + // Step-2: trigger a checkpoint + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + + assertTrue(historyState.list.size() > 0); + assertTrue(offsetState.list.size() > 0); + + source.cancel(); + source.close(); + runThread.sync(); + } + } + + { + // Step-3: restore the source from state + final DebeziumSourceFunction source2 = createOracleLogminerSource(); + final TestSourceContext sourceContext2 = new TestSourceContext<>(); + setupSource(source2, true, offsetState, historyState, true, 0, 1); + final CheckedThread runThread2 = + new CheckedThread() { + @Override + public void go() throws Exception { + source2.run(sourceContext2); + } + }; + runThread2.start(); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2)); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) VALUES (113,'Airplane','Toy airplane',1.304)"); // 113 + List records = drain(sourceContext2, 1); + assertEquals(1, records.size()); + // assertInsert(records.get(0), "id", 113); + + source2.cancel(); + source2.close(); + runThread2.sync(); + } + } + } + + @Test + public void testConsumingEmptyTable() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + int prevPos = 0; + { + // --------------------------------------------------------------------------- + // Step-1: start the source from empty state + // --------------------------------------------------------------------------- + DebeziumSourceFunction source = + basicSourceBuilder() + .tableList(database.getDatabaseName() + "." + "category") + .build(); + // we use blocking context to block the source to emit before last snapshot record + final BlockingSourceContext sourceContext = + new BlockingSourceContext<>(8); + // setup source with empty state + setupSource(source, false, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until Debezium is started + while (!source.getDebeziumStarted()) { + Thread.sleep(100); + } + // --------------------------------------------------------------------------- + // Step-2: trigger checkpoint-1 + // --------------------------------------------------------------------------- + synchronized (sourceContext.getCheckpointLock()) { + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + + // state is still empty + assertEquals(0, offsetState.list.size()); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext)); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute("INSERT INTO debezium.category VALUES (1, 'book')"); + statement.execute("INSERT INTO debezium.category VALUES (2, 'shoes')"); + statement.execute("UPDATE debezium.category SET category_name='books' WHERE id=1"); + List records = drain(sourceContext, 3); + assertEquals(3, records.size()); + // assertInsert(records.get(0), "id", 1); + // assertInsert(records.get(1), "id", 2); + // assertUpdate(records.get(2), "id", 1); + + // --------------------------------------------------------------------------- + // Step-4: trigger checkpoint-2 during DML operations + // --------------------------------------------------------------------------- + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138)); + } + + assertHistoryState(historyState); // assert the DDL is stored in the history state + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("oracle_logminer", JsonPath.read(state, "$.sourcePartition.server")); + } + + source.cancel(); + source.close(); + runThread.sync(); + } + } + + private void assertHistoryState(TestingListState historyState) { + // assert the DDL is stored in the history state + assertTrue(historyState.list.size() > 0); + boolean hasDDL = + historyState.list.stream() + .skip(1) + .anyMatch( + history -> + JsonPath.read(history, "$.source.server") + .equals("oracle_logminer") + && JsonPath.read(history, "$.position.snapshot") + .toString() + .equals("true") + && JsonPath.read(history, "$.ddl") + .toString() + .contains("CREATE TABLE \"DEBEZIUM\".")); + + assertTrue(hasDDL); + } + + // ------------------------------------------------------------------------------------------ + // Public Utilities + // ------------------------------------------------------------------------------------------ + + /** Gets the latest offset of current MySQL server. */ + public static Tuple2 currentMySQLLatestOffset( + UniqueDatabase database, String table, int expectedRecordCount) throws Exception { + DebeziumSourceFunction source = + OracleSource.builder() + .hostname(ORACLE_CONTAINER.getHost()) + .port(ORACLE_CONTAINER.getOraclePort()) + .database(database.getDatabaseName()) + .tableList(database.getDatabaseName() + "." + table) + .username(ORACLE_CONTAINER.getUsername()) + .password(ORACLE_CONTAINER.getPassword()) + .deserializer(new OracleSourceTest.ForwardDeserializeSchema()) + .build(); + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + + // --------------------------------------------------------------------------- + // Step-1: start source + // --------------------------------------------------------------------------- + TestSourceContext sourceContext = new TestSourceContext<>(); + setupSource(source, false, offsetState, historyState, true, 0, 1); + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + drain(sourceContext, expectedRecordCount); + + // --------------------------------------------------------------------------- + // Step-2: trigger checkpoint-1 after snapshot finished + // --------------------------------------------------------------------------- + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + String offsetFile = JsonPath.read(state, "$.sourceOffset.file"); + int offsetPos = JsonPath.read(state, "$.sourceOffset.pos"); + + source.cancel(); + source.close(); + runThread.sync(); + + return Tuple2.of(offsetFile, offsetPos); + } + + // ------------------------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------------------------ + + private DebeziumSourceFunction createOracleLogminerSource() { + return basicSourceBuilder().build(); + } + + private OracleSource.Builder basicSourceBuilder() { + + return OracleSource.builder() + .hostname(ORACLE_CONTAINER.getHost()) + .port(ORACLE_CONTAINER.getOraclePort()) + .database("XE") + .tableList( + database.getDatabaseName() + "." + "products") // monitor table "products" + .username(ORACLE_CONTAINER.getUsername()) + .password(ORACLE_CONTAINER.getPassword()) + .deserializer(new ForwardDeserializeSchema()); + } + + private static List drain(TestSourceContext sourceContext, int expectedRecordCount) + throws Exception { + List allRecords = new ArrayList<>(); + LinkedBlockingQueue> queue = sourceContext.getCollectedOutputs(); + while (allRecords.size() < expectedRecordCount) { + StreamRecord record = queue.poll(100, TimeUnit.SECONDS); + if (record != null) { + allRecords.add(record.getValue()); + } else { + throw new RuntimeException( + "Can't receive " + expectedRecordCount + " elements before timeout."); + } + } + + return allRecords; + } + + private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout) + throws Exception { + final Semaphore semaphore = new Semaphore(0); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute( + () -> { + synchronized (checkpointLock) { + semaphore.release(); + } + }); + boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS); + executor.shutdownNow(); + return result; + } + + /** + * Wait for a maximum amount of time until the first record is available. + * + * @param timeout the maximum amount of time to wait; must not be negative + * @return {@code true} if records are available, or {@code false} if the timeout occurred and + * no records are available + */ + private boolean waitForAvailableRecords(Duration timeout, TestSourceContext sourceContext) + throws InterruptedException { + long now = System.currentTimeMillis(); + long stop = now + timeout.toMillis(); + while (System.currentTimeMillis() < stop) { + if (!sourceContext.getCollectedOutputs().isEmpty()) { + break; + } + Thread.sleep(10); // save CPU + } + return !sourceContext.getCollectedOutputs().isEmpty(); + } + + private static void setupSource(DebeziumSourceFunction source) throws Exception { + setupSource( + source, false, null, null, + true, // enable checkpointing; auto commit should be ignored + 0, 1); + } + + private static void setupSource( + DebeziumSourceFunction source, + boolean isRestored, + ListState restoredOffsetState, + ListState restoredHistoryState, + boolean isCheckpointingEnabled, + int subtaskIndex, + int totalNumSubtasks) + throws Exception { + + // run setup procedure in operator life cycle + source.setRuntimeContext( + new MockStreamingRuntimeContext( + isCheckpointingEnabled, totalNumSubtasks, subtaskIndex)); + source.initializeState( + new MockFunctionInitializationContext( + isRestored, + new MockOperatorStateStore(restoredOffsetState, restoredHistoryState))); + source.open(new Configuration()); + } + + /** + * A simple implementation of {@link DebeziumDeserializationSchema} which just forward the + * {@link SourceRecord}. + */ + public static class ForwardDeserializeSchema + implements DebeziumDeserializationSchema { + + private static final long serialVersionUID = 2975058057832211228L; + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + out.collect(record); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(SourceRecord.class); + } + } + + private static class MockOperatorStateStore implements OperatorStateStore { + + private final ListState restoredOffsetListState; + private final ListState restoredHistoryListState; + + private MockOperatorStateStore( + ListState restoredOffsetListState, ListState restoredHistoryListState) { + this.restoredOffsetListState = restoredOffsetListState; + this.restoredHistoryListState = restoredHistoryListState; + } + + @Override + @SuppressWarnings("unchecked") + public ListState getUnionListState(ListStateDescriptor stateDescriptor) + throws Exception { + if (stateDescriptor.getName().equals(DebeziumSourceFunction.OFFSETS_STATE_NAME)) { + return (ListState) restoredOffsetListState; + } else if (stateDescriptor + .getName() + .equals(DebeziumSourceFunction.HISTORY_RECORDS_STATE_NAME)) { + return (ListState) restoredHistoryListState; + } else { + throw new IllegalStateException("Unknown state."); + } + } + + @Override + public BroadcastState getBroadcastState( + MapStateDescriptor stateDescriptor) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public ListState getListState(ListStateDescriptor stateDescriptor) + throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredStateNames() { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredBroadcastStateNames() { + throw new UnsupportedOperationException(); + } + } + + private static class MockFunctionInitializationContext + implements FunctionInitializationContext { + + private final boolean isRestored; + private final OperatorStateStore operatorStateStore; + + private MockFunctionInitializationContext( + boolean isRestored, OperatorStateStore operatorStateStore) { + this.isRestored = isRestored; + this.operatorStateStore = operatorStateStore; + } + + @Override + public boolean isRestored() { + return isRestored; + } + + @Override + public OperatorStateStore getOperatorStateStore() { + return operatorStateStore; + } + + @Override + public KeyedStateStore getKeyedStateStore() { + throw new UnsupportedOperationException(); + } + } + + private static class BlockingSourceContext extends TestSourceContext { + + private final Semaphore blocker = new Semaphore(0); + private final int expectedCount; + private int currentCount = 0; + + private BlockingSourceContext(int expectedCount) { + this.expectedCount = expectedCount; + } + + @Override + public void collect(T t) { + super.collect(t); + currentCount++; + if (currentCount == expectedCount) { + try { + // block the source to emit records + blocker.acquire(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + private static final class TestingListState implements ListState { + + private final List list = new ArrayList<>(); + private boolean clearCalled = false; + + @Override + public void clear() { + list.clear(); + clearCalled = true; + } + + @Override + public Iterable get() throws Exception { + return list; + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + list.add(value); + } + + public List getList() { + return list; + } + + boolean isClearCalled() { + return clearCalled; + } + + @Override + public void update(List values) throws Exception { + clear(); + + addAll(values); + } + + @Override + public void addAll(List values) throws Exception { + if (values != null) { + values.forEach( + v -> Preconditions.checkNotNull(v, "You cannot add null to a ListState.")); + + list.addAll(values); + } + } + } +} diff --git a/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/OracleTestBase.java b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/OracleTestBase.java new file mode 100644 index 000000000..fc198a681 --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/OracleTestBase.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle; + +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.OracleContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +/** + * Basic class for testing Oracle logminer source, this contains a Oracle container which enables + * logminer. + */ +public abstract class OracleTestBase extends AbstractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(OracleTestBase.class); + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + private static final DockerImageName OC_IMAGE = + DockerImageName.parse("thake/oracle-xe-11g").asCompatibleSubstituteFor("oracle"); + + protected static final OracleContainer ORACLE_CONTAINER = + new OracleContainer(OC_IMAGE) + .withClasspathResourceMapping( + "docker/setup.sh", "/etc/logminer_conf.sh", BindMode.READ_WRITE) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + + Startables.deepStart(Stream.of(ORACLE_CONTAINER)).join(); + LOG.info("Containers are started."); + } + + protected Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + ORACLE_CONTAINER.getJdbcUrl(), + ORACLE_CONTAINER.getUsername(), + ORACLE_CONTAINER.getPassword()); + } +} diff --git a/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java new file mode 100644 index 000000000..a902f90ab --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +import com.alibaba.ververica.cdc.connectors.oracle.OracleTestBase; +import com.alibaba.ververica.cdc.connectors.oracle.utils.UniqueDatabase; +import org.junit.Before; +import org.junit.Test; +import org.testcontainers.containers.Container; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +/** Integration tests for MySQL binlog SQL source. */ +public class OracleConnectorITCase extends OracleTestBase { + + private final UniqueDatabase database = + new UniqueDatabase(ORACLE_CONTAINER, "debezium", "system", "oracle", "inventory_1"); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, + EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()); + + @Before + public void before() throws Exception { + TestValuesTableFactory.clearAllData(); + Container.ExecResult execResult1 = + ORACLE_CONTAINER.execInContainer("chmod", "+x", "/etc/logminer_conf.sh"); + + execResult1.getStdout(); + execResult1.getStderr(); + + // Container.ExecResult execResult12 = ORACLE_CONTAINER.execInContainer("chmod", "-R", + // "777", "/u01/app/oracle/"); + // execResult12 = ORACLE_CONTAINER.execInContainer("su - oracle"); + + // execResult12.getStdout(); + // execResult12.getStderr(); + Container.ExecResult execResult = + ORACLE_CONTAINER.execInContainer("/bin/sh", "-c", "/etc/logminer_conf.sh"); + execResult.getStdout(); + execResult.getStderr(); + env.setParallelism(1); + } + + @Test + public void testConsumingAllEvents() + throws SQLException, ExecutionException, InterruptedException { + // inventoryDatabase.createAndInitialize(); + + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " ID INT NOT NULL," + + " NAME STRING," + + " DESCRIPTION STRING," + + " WEIGHT DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'oracle-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = 'XE'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'" + + ")", + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), + "dbzuser", + "dbz", + database.getDatabaseName(), + "products"); + String sinkDDL = + "CREATE TABLE sink (" + + " name STRING," + + " weightSum DECIMAL(10,3)," + + " PRIMARY KEY (name) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '20'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME"); + + waitForSnapshotStarted("sink"); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106"); + statement.execute("UPDATE debezium.products SET WEIGHT='5.1' WHERE ID=107"); + statement.execute( + "INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)"); // 110 + statement.execute( + "INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)"); + statement.execute( + "UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=111"); + statement.execute("UPDATE debezium.products SET WEIGHT='5.17' WHERE ID=112"); + statement.execute("DELETE FROM debezium.products WHERE ID=112"); + } + + waitForSinkSize("sink", 20); + + // The final database table looks like this: + // + // > SELECT * FROM products; + // +-----+--------------------+---------------------------------------------------------+--------+ + // | id | name | description | + // weight | + // +-----+--------------------+---------------------------------------------------------+--------+ + // | 101 | scooter | Small 2-wheel scooter | + // 3.14 | + // | 102 | car battery | 12V car battery | + // 8.1 | + // | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | + // 0.8 | + // | 104 | hammer | 12oz carpenter's hammer | + // 0.75 | + // | 105 | hammer | 14oz carpenter's hammer | + // 0.875 | + // | 106 | hammer | 18oz carpenter hammer | + // 1 | + // | 107 | rocks | box of assorted rocks | + // 5.1 | + // | 108 | jacket | water resistent black wind breaker | + // 0.1 | + // | 109 | spare tire | 24 inch spare tire | + // 22.2 | + // | 110 | jacket | new water resistent white wind breaker | + // 0.5 | + // +-----+--------------------+---------------------------------------------------------+--------+ + + String[] expected = + new String[] { + "scooter,8.310", + "car battery,8.100", + "12-pack drill bits,0.800", + "hammer,2.625", + "rocks,5.100", + "jacket,1.100", + "spare tire,22.200" + }; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + /* + @Test + public void testAllTypes() throws Throwable { + //database.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE full_types (\n" + + " id INT NOT NULL,\n" + + " tiny_c TINYINT,\n" + + " tiny_un_c SMALLINT ,\n" + + " small_c SMALLINT,\n" + + " small_un_c INT,\n" + + " int_c INT ,\n" + + " int_un_c BIGINT,\n" + + " big_c BIGINT,\n" + + " varchar_c STRING,\n" + + " char_c STRING,\n" + + " float_c FLOAT,\n" + + " double_c DOUBLE,\n" + + " decimal_c DECIMAL(8, 4),\n" + + " numeric_c DECIMAL(6, 0),\n" + + " boolean_c BOOLEAN,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " datetime3_c TIMESTAMP(3),\n" + + " datetime6_c TIMESTAMP(6),\n" + + " timestamp_c TIMESTAMP(0),\n" + + " file_uuid BYTES\n" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'" + + ")", + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), + database.getUsername(), + database.getPassword(), + database.getDatabaseName(), + "full_types"); + String sinkDDL = + "CREATE TABLE sink (\n" + + " id INT NOT NULL,\n" + + " tiny_c TINYINT,\n" + + " tiny_un_c SMALLINT ,\n" + + " small_c SMALLINT,\n" + + " small_un_c INT,\n" + + " int_c INT ,\n" + + " int_un_c BIGINT,\n" + + " big_c BIGINT,\n" + + " varchar_c STRING,\n" + + " char_c STRING,\n" + + " float_c FLOAT,\n" + + " double_c DOUBLE,\n" + + " decimal_c DECIMAL(8, 4),\n" + + " numeric_c DECIMAL(6, 0),\n" + + " boolean_c BOOLEAN,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " datetime3_c TIMESTAMP(3),\n" + + " datetime6_c TIMESTAMP(6),\n" + + " timestamp_c TIMESTAMP(0),\n" + + " file_uuid STRING\n" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT id,\n" + + "tiny_c,\n" + + "tiny_un_c,\n" + + "small_c,\n" + + "small_un_c,\n" + + "int_c,\n" + + "int_un_c,\n" + + "big_c,\n" + + "varchar_c,\n" + + "char_c,\n" + + "float_c,\n" + + "double_c,\n" + + "decimal_c,\n" + + "numeric_c,\n" + + "boolean_c,\n" + + "date_c,\n" + + "time_c,\n" + + "datetime3_c,\n" + + "datetime6_c,\n" + + "timestamp_c,\n" + + "TO_BASE64(DECODE(file_uuid, 'UTF-8')) FROM full_types"); + + waitForSnapshotStarted("sink"); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;"); + } + + waitForSinkSize("sink", 3); + + List expected = + Arrays.asList( + "+I(1,127,255,32767,65535,2147483647,4294967295,9223372036854775807,Hello World,abc," + + "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123," + + "2020-07-17T18:00:22.123456,2020-07-17T18:00:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)", + "-U(1,127,255,32767,65535,2147483647,4294967295,9223372036854775807,Hello World,abc," + + "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123," + + "2020-07-17T18:00:22.123456,2020-07-17T18:00:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)", + "+U(1,127,255,32767,65535,2147483647,4294967295,9223372036854775807,Hello World,abc," + + "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123," + + "2020-07-17T18:00:22.123456,2020-07-17T18:33:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)"); + List actual = TestValuesTableFactory.getRawResults("sink"); + assertEquals(expected, actual); + + result.getJobClient().get().cancel().get(); + } + + */ + @Test + public void testStartupFromLatestOffset() throws Exception { + // database.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " ID INT NOT NULL," + + " NAME STRING," + + " DESCRIPTION STRING," + + " WEIGHT DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'oracle-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = 'XE'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s' ," + + " 'scan.startup.mode' = 'latest-offset'" + + ")", + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), + "dbzuser", + "dbz", + database.getDatabaseName(), + "products"); + String sinkDDL = + "CREATE TABLE sink " + + " WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ") LIKE debezium_source (EXCLUDING OPTIONS)"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + // wait for the source startup, we don't have a better way to wait it, use sleep for now + Thread.sleep(5000L); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "INSERT INTO debezium.products VALUES (110,'jacket','water resistent white wind breaker',0.2)"); // 110 + statement.execute( + "INSERT INTO debezium.products VALUES (111,'scooter','Big 2-wheel scooter ',5.18)"); + statement.execute( + "UPDATE debezium.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110"); + statement.execute("UPDATE debezium.products SET weight='5.17' WHERE id=111"); + statement.execute("DELETE FROM debezium.products WHERE id=111"); + } + + waitForSinkSize("sink", 7); + + String[] expected = + new String[] {"110,jacket,new water resistent white wind breaker,0.500"}; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + + // ------------------------------------------------------------------------------------ + + private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { + while (sinkSize(sinkName) == 0) { + Thread.sleep(100); + } + } + + private static void waitForSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + private static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } +} diff --git a/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java new file mode 100644 index 000000000..ae9bab430 --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test for {@link com.alibaba.ververica.cdc.connectors.oracle.table.OracleTableSource} created by + * {@link com.alibaba.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory}. + */ +public class OracleTableSourceFactoryTest { + private static final TableSchema SCHEMA = + TableSchema.builder() + .field("aaa", DataTypes.INT().notNull()) + .field("bbb", DataTypes.STRING().notNull()) + .field("ccc", DataTypes.DOUBLE()) + .field("ddd", DataTypes.DECIMAL(31, 18)) + .field("eee", DataTypes.TIMESTAMP(3)) + .primaryKey("bbb", "aaa") + .build(); + + private static final String MY_LOCALHOST = "localhost"; + private static final String MY_USERNAME = "flinkuser"; + private static final String MY_PASSWORD = "flinkpw"; + private static final String MY_DATABASE = "myDB"; + private static final String MY_TABLE = "myTable"; + private static final String MY_SCHEMA = "mySchema"; + private static final Properties PROPERTIES = new Properties(); + + @Test + public void testCommonProperties() { + Map properties = getAllOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + OracleTableSource expectedSource = + new OracleTableSource( + TableSchemaUtils.getPhysicalSchema(SCHEMA), + 1521, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_SCHEMA, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + PROPERTIES, + StartupOptions.initial()); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testOptionalProperties() { + Map options = getAllOptions(); + options.put("port", "1521"); + options.put("server-time-zone", "Asia/Shanghai"); + options.put("debezium.snapshot.mode", "initial"); + + DynamicTableSource actualSource = createTableSource(options); + Properties dbzProperties = new Properties(); + dbzProperties.put("snapshot.mode", "initial"); + OracleTableSource expectedSource = + new OracleTableSource( + TableSchemaUtils.getPhysicalSchema(SCHEMA), + 1521, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_SCHEMA, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("Asia/Shanghai"), + dbzProperties, + StartupOptions.initial()); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testStartupFromInitial() { + Map properties = getAllOptions(); + properties.put("scan.startup.mode", "initial"); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + OracleTableSource expectedSource = + new OracleTableSource( + TableSchemaUtils.getPhysicalSchema(SCHEMA), + 1521, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_SCHEMA, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + PROPERTIES, + StartupOptions.initial()); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testStartupFromLatestOffset() { + Map properties = getAllOptions(); + properties.put("scan.startup.mode", "latest-offset"); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + OracleTableSource expectedSource = + new OracleTableSource( + TableSchemaUtils.getPhysicalSchema(SCHEMA), + 1521, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_SCHEMA, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + PROPERTIES, + StartupOptions.latest()); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testValidation() { + // validate illegal port + try { + Map properties = getAllOptions(); + properties.put("port", "123b"); + + createTableSource(properties); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, "Could not parse value '123b' for key 'port'.") + .isPresent()); + } + + // validate missing required + Factory factory = new OracleTableSourceFactory(); + for (ConfigOption requiredOption : factory.requiredOptions()) { + Map properties = getAllOptions(); + properties.remove(requiredOption.key()); + + try { + createTableSource(properties); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, + "Missing required options are:\n\n" + requiredOption.key()) + .isPresent()); + } + } + + // validate unsupported option + try { + Map properties = getAllOptions(); + properties.put("unknown", "abc"); + + createTableSource(properties); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage(t, "Unsupported options:\n\nunknown") + .isPresent()); + } + + // validate unsupported option + try { + Map properties = getAllOptions(); + properties.put("scan.startup.mode", "abc"); + + createTableSource(properties); + fail("exception expected"); + } catch (Throwable t) { + String msg = + "Invalid value for option 'scan.startup.mode'. Supported values are " + + "[initial, latest-offset], " + + "but was: abc"; + + assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent()); + } + } + + private Map getAllOptions() { + Map options = new HashMap<>(); + options.put("connector", "oracle-cdc"); + options.put("hostname", MY_LOCALHOST); + options.put("database-name", MY_DATABASE); + options.put("table-name", MY_TABLE); + options.put("username", MY_USERNAME); + options.put("password", MY_PASSWORD); + options.put("schema-name", MY_SCHEMA); + return options; + } + + private static DynamicTableSource createTableSource(Map options) { + return FactoryUtil.createTableSource( + null, + ObjectIdentifier.of("default", "default", "t1"), + new CatalogTableImpl(SCHEMA, options, "mock source"), + new Configuration(), + OracleTableSourceFactoryTest.class.getClassLoader(), + false); + } +} diff --git a/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/utils/UniqueDatabase.java b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/utils/UniqueDatabase.java new file mode 100644 index 000000000..0b7f27392 --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/oracle/utils/UniqueDatabase.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle.utils; + +import org.testcontainers.containers.OracleContainer; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertNotNull; + +/** + * Create and populate a unique instance of a Oracle database for each run of JUnit test. A user of + * class needs to provide a logical name for Debezium and database name. It is expected that there + * is a init file in src/test/resources/ddl/<database_name>.sql. The database + * name is enriched with a unique suffix that guarantees complete isolation between runs + * <database_name>_<suffix> + * + *

This class is inspired from Debezium project. + */ +public class UniqueDatabase { + + private static final String[] CREATE_DATABASE_DDL = + new String[] {"CREATE DATABASE $DBNAME$;", "USE $DBNAME$;"}; + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + + private final OracleContainer container; + private final String databaseName; + private final String templateName; + private final String username; + private final String password; + private final String schemaName = "debezium"; + + public UniqueDatabase( + OracleContainer container, + String databaseName, + String username, + String password, + String sqlName) { + this( + container, + databaseName, + Integer.toUnsignedString(new Random().nextInt(), 36), + username, + password, + sqlName); + } + + private UniqueDatabase( + OracleContainer container, + String databaseName, + final String identifier, + String username, + String password, + String sqlName) { + this.container = container; + this.databaseName = databaseName; // + "_" + identifier; + this.templateName = sqlName; + this.username = username; + this.password = password; + } + + public String getDatabaseName() { + return databaseName; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + /** @return Fully qualified table name <databaseName>.<tableName> */ + public String qualifiedTableName(final String tableName) { + return String.format("%s.%s", databaseName, tableName); + } + + /** Creates the database and populates it with initialization SQL script. */ + public void createAndInitialize() { + final String ddlFile = String.format("ddl/%s.sql", templateName); + final URL ddlTestFile = UniqueDatabase.class.getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + try { + try (Connection connection = + DriverManager.getConnection( + container.getJdbcUrl(), username, password); + Statement statement = connection.createStatement()) { + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .map(this::convertSQL) + .collect(Collectors.joining("\n")) + .split(";")) + .map(x -> x.replace("$$", ";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } + } catch (final Exception e) { + throw new IllegalStateException(e); + } + } + + public Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection(container.getJdbcUrl(), username, password); + } + + private String convertSQL(final String sql) { + return sql.replace("$DBNAME$", databaseName); + } +} diff --git a/flink-connector-oracle-cdc/src/test/resources/ddl/column_type_test.sql b/flink-connector-oracle-cdc/src/test/resources/ddl/column_type_test.sql new file mode 100644 index 000000000..be59d79d8 --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/resources/ddl/column_type_test.sql @@ -0,0 +1,50 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: column_type_test +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE TABLE full_types ( + id INT AUTO_INCREMENT NOT NULL, + tiny_c TINYINT, + tiny_un_c TINYINT UNSIGNED, + small_c SMALLINT, + small_un_c SMALLINT UNSIGNED, + int_c INTEGER , + int_un_c INTEGER UNSIGNED, + big_c BIGINT, + varchar_c VARCHAR(255), + char_c CHAR(3), + float_c FLOAT, + double_c DOUBLE, + decimal_c DECIMAL(8, 4), + numeric_c NUMERIC(6, 0), + boolean_c BOOLEAN, + date_c DATE, + time_c TIME(0), + datetime3_c DATETIME(3), + datetime6_c DATETIME(6), + timestamp_c TIMESTAMP, + file_uuid BINARY(16), + PRIMARY KEY (id) +) DEFAULT CHARSET=utf8; + +INSERT INTO full_types VALUES ( + DEFAULT, 127, 255, 32767, 65535, 2147483647, 4294967295, 9223372036854775807, + 'Hello World', 'abc', 123.102, 404.4443, 123.4567, 345.6, true, + '2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', + unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400','-','')) +); \ No newline at end of file diff --git a/flink-connector-oracle-cdc/src/test/resources/ddl/inventory.sql b/flink-connector-oracle-cdc/src/test/resources/ddl/inventory.sql new file mode 100644 index 000000000..2f1ff32ad --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/resources/ddl/inventory.sql @@ -0,0 +1,93 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: inventory +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate our products using a single insert with many rows +CREATE TABLE HR.products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT +); +ALTER TABLE HR.products AUTO_INCREMENT = 101; + +INSERT INTO HR.products +VALUES (default,"scooter","Small 2-wheel scooter",3.14), + (default,"car battery","12V car battery",8.1), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), + (default,"hammer","12oz carpenter's hammer",0.75), + (default,"hammer","14oz carpenter's hammer",0.875), + (default,"hammer","16oz carpenter's hammer",1.0), + (default,"rocks","box of assorted rocks",5.3), + (default,"jacket","water resistent black wind breaker",0.1), + (default,"spare tire","24 inch spare tire",22.2); + +-- Create and populate the products on hand using multiple inserts +CREATE TABLE HR.products_on_hand ( + product_id INTEGER NOT NULL PRIMARY KEY, + quantity INTEGER NOT NULL, + FOREIGN KEY (product_id) REFERENCES products(id) +); + +INSERT INTO products_on_hand VALUES (101,3); +INSERT INTO products_on_hand VALUES (102,8); +INSERT INTO products_on_hand VALUES (103,18); +INSERT INTO products_on_hand VALUES (104,4); +INSERT INTO products_on_hand VALUES (105,5); +INSERT INTO products_on_hand VALUES (106,0); +INSERT INTO products_on_hand VALUES (107,44); +INSERT INTO products_on_hand VALUES (108,2); +INSERT INTO products_on_hand VALUES (109,5); + +-- Create some customers ... +CREATE TABLE HR.customers ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE KEY +) AUTO_INCREMENT=1001; + + +INSERT INTO HR.customers +VALUES (default,"Sally","Thomas","sally.thomas@acme.com"), + (default,"George","Bailey","gbailey@foobar.com"), + (default,"Edward","Walker","ed@walker.com"), + (default,"Anne","Kretchmar","annek@noanswer.org"); + +-- Create some very simple orders +CREATE TABLE HR.orders ( + order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + order_date DATE NOT NULL, + purchaser INTEGER NOT NULL, + quantity INTEGER NOT NULL, + product_id INTEGER NOT NULL, + FOREIGN KEY order_customer (purchaser) REFERENCES customers(id), + FOREIGN KEY ordered_product (product_id) REFERENCES products(id) +) AUTO_INCREMENT = 10001; + +INSERT INTO HR.orders +VALUES (default, '2016-01-16', 1001, 1, 102), + (default, '2016-01-17', 1002, 2, 105), + (default, '2016-02-18', 1004, 3, 109), + (default, '2016-02-19', 1002, 2, 106), + (default, '16-02-21', 1003, 1, 107); + +CREATE TABLE HR.category ( + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + category_name VARCHAR(255) +); \ No newline at end of file diff --git a/flink-connector-oracle-cdc/src/test/resources/ddl/inventory_1.sql b/flink-connector-oracle-cdc/src/test/resources/ddl/inventory_1.sql new file mode 100644 index 000000000..3578a942c --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/resources/ddl/inventory_1.sql @@ -0,0 +1,48 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: inventory +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate our products using a single insert with many rows +CREATE TABLE HR.products ( + id INTEGER NOT NULL, + name VARCHAR(255) NOT NULL, + description VARCHAR(512), + weight FLOAT, + PRIMARY KEY(id) +); +ALTER TABLE HR.products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; +-- Auto-generated SQL script #202106141039 + +INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (101,'scooter','Small 2-wheel scooter',3.14); +INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (102,'car battery','12V car battery',8.1); +INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); +INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (104,'hammer','12oz carpenters hammer',0.75); +INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (105,'hammer','14oz carpenters hammer',0.875); +INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (106,'hammer','16oz carpenters hammer',1.0); +INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (107,'rocks','box of assorted rocks',5.3); +INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (108,'jacket','water resistent black wind breaker',0.1); +INSERT INTO HR.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (109,'spare tire','24 inch spare tire',22.2); \ No newline at end of file diff --git a/flink-connector-oracle-cdc/src/test/resources/docker/setup.sh b/flink-connector-oracle-cdc/src/test/resources/docker/setup.sh new file mode 100644 index 000000000..144e25fab --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/resources/docker/setup.sh @@ -0,0 +1,114 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +#!/bin/sh + +mkdir /u01/app/oracle/oradata/recovery_area + +# Set archive log mode and enable GG replication +ORACLE_SID=XE +export ORACLE_SID +#echo 'export PATH=$ORACLE_HOME/bin:$PATH' >> /etc/bash.bashrc +export ORACLE_HOME=/u01/app/oracle/product/11.2.0/xe + + +# Create Log Miner Tablespace and User +/u01/app/oracle/product/11.2.0/xe/bin/sqlplus sys/oracle@//localhost:1521/XE as sysdba <<- EOF + CREATE TABLESPACE LOGMINER_TBS DATAFILE '/u01/app/oracle/oradata/XE/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; + exit; +EOF + +/u01/app/oracle/product/11.2.0/xe/bin/sqlplus sys/oracle@//localhost:1521/XE as sysdba <<- EOF + CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; + + + GRANT CREATE SESSION TO dbzuser; + GRANT SELECT ON V_$DATABASE TO dbzuser; + GRANT FLASHBACK ANY TABLE TO dbzuser; + GRANT SELECT ANY TABLE TO dbzuser; + GRANT SELECT_CATALOG_ROLE TO dbzuser; + GRANT EXECUTE_CATALOG_ROLE TO dbzuser; + GRANT SELECT ANY TRANSACTION TO dbzuser; + GRANT SELECT ANY DICTIONARY TO dbzuser; + + GRANT CREATE TABLE TO dbzuser; + GRANT ALTER ANY TABLE TO dbzuser; + GRANT LOCK ANY TABLE TO dbzuser; + GRANT CREATE SEQUENCE TO dbzuser; + + GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser; + GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser; + GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser; + GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser; + GRANT SELECT ON V_$LOGFILE TO dbzuser; + GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser; + GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser; + GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbzuser; + GRANT SELECT ON V_$LOG TO dbzuser; + GRANT SELECT ON V_$LOG_HISTORY TO dbzuser; + + exit; +EOF + +/u01/app/oracle/product/11.2.0/xe/bin/sqlplus sys/oracle@//localhost:1521/XE as sysdba <<- EOF + CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS; + GRANT CONNECT TO debezium; + GRANT CREATE SESSION TO debezium; + GRANT CREATE TABLE TO debezium; + GRANT CREATE SEQUENCE to debezium; + ALTER USER debezium QUOTA 100M on users; + exit; +EOF + +/u01/app/oracle/product/11.2.0/xe/bin/sqlplus sys/oracle@//localhost:1521/XE as sysdba <<- EOF + + CREATE TABLE debezium.products ( + ID INTEGER NOT NULL, + NAME VARCHAR(255) NOT NULL, + DESCRIPTION VARCHAR(512), + WEIGHT FLOAT, + PRIMARY KEY(ID) + ); + CREATE TABLE debezium.category ( + ID INTEGER NOT NULL, + CATEGORY_NAME VARCHAR(255), + PRIMARY KEY(ID) + ); + ALTER TABLE debezium.products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + ALTER TABLE debezium.category ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + + INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (101,'scooter','Small 2-wheel scooter',3.14); + INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (102,'car battery','12V car battery',8.1); + INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); + INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (104,'hammer','12oz carpenters hammer',0.75); + INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (105,'hammer','14oz carpenters hammer',0.875); + INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (106,'hammer','16oz carpenters hammer',1.0); + INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (107,'rocks','box of assorted rocks',5.3); + INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (108,'jacket','water resistent black wind breaker',0.1); + INSERT INTO debezium.PRODUCTS (ID,NAME,DESCRIPTION,WEIGHT) + VALUES (109,'spare tire','24 inch spare tire',22.2); + exit; +EOF diff --git a/flink-connector-oracle-cdc/src/test/resources/log4j2-test.properties b/flink-connector-oracle-cdc/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..b82a9606d --- /dev/null +++ b/flink-connector-oracle-cdc/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/flink-sql-connector-oracle-cdc/pom.xml b/flink-sql-connector-oracle-cdc/pom.xml new file mode 100644 index 000000000..2b0c8700c --- /dev/null +++ b/flink-sql-connector-oracle-cdc/pom.xml @@ -0,0 +1,78 @@ + + + + + flink-cdc-connectors + com.alibaba.ververica + 1.3-SNAPSHOT + + 4.0.0 + + flink-sql-connector-oracle-cdc + + + + com.alibaba.ververica + flink-connector-oracle-cdc + ${project.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + shade-flink + package + + shade + + + false + + + *:* + + + + + org.apache.kafka:* + + kafka/kafka-version.properties + LICENSE + + NOTICE + common/** + + + + + + + + + + diff --git a/flink-sql-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/DummyDocs.java b/flink-sql-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/DummyDocs.java new file mode 100644 index 000000000..9d15a4cd3 --- /dev/null +++ b/flink-sql-connector-oracle-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/oracle/DummyDocs.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.oracle; + +/** This is used to generate a dummy docs jar for this module to pass OSS repository rule. */ +public class DummyDocs {} diff --git a/flink-sql-connector-oracle-cdc/src/main/resources/META-INF/NOTICE b/flink-sql-connector-oracle-cdc/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000..420bf5931 --- /dev/null +++ b/flink-sql-connector-oracle-cdc/src/main/resources/META-INF/NOTICE @@ -0,0 +1,6 @@ +flink-sql-connector-oracle-cdc +Copyright 2020 Ververica Inc. + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- org.apache.kafka:kafka-clients:2.5.0 diff --git a/pom.xml b/pom.xml index c5ae2d12a..bc290947f 100644 --- a/pom.xml +++ b/pom.xml @@ -37,10 +37,13 @@ under the License. flink-connector-test-util flink-connector-mysql-cdc flink-connector-postgres-cdc + flink-connector-oracle-cdc flink-connector-mongodb-cdc flink-sql-connector-mysql-cdc flink-sql-connector-postgres-cdc flink-sql-connector-mongodb-cdc + flink-sql-connector-postgres-cdc + flink-sql-connector-oracle-cdc flink-format-changelog-json @@ -508,4 +511,4 @@ under the License. - \ No newline at end of file +