diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index 6bc7a8a55..56573736b 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -42,7 +42,7 @@ import java.util.Map;
import static com.ververica.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;
-/** Copied from {@link AlterTableParserListener} in Debezium 1.9.7.Final. */
+/** Copied from {@link AlterTableParserListener} in Debezium 1.9.8.Final. */
public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private static final int STARTING_INDEX = 1;
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java
index 089a81208..be99d3653 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java
@@ -48,7 +48,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
- * Copied from {@link MySqlAntlrDdlParserListener} in Debezium 1.9.7.final.
+ * Copied from {@link MySqlAntlrDdlParserListener} in Debezium 1.9.8.final.
*
*
This listener's constructor will use some modified listener.
*/
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java
index 6bb18a75c..2b0688a5d 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java
@@ -22,7 +22,7 @@ import java.util.Objects;
import java.util.Optional;
/**
- * Copied from Debezium 1.9.7.Final.
+ * Copied from Debezium 1.9.8.Final.
*
*
Base class for {@link ChangeRecordEmitter} implementations based on a relational database.
*
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
index faa4997a8..84f5c230a 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
@@ -38,7 +38,7 @@ import java.util.OptionalLong;
import java.util.Properties;
/**
- * Copied from Debezium project(1.9.7.final) to add custom jdbc properties in the jdbc url. The new
+ * Copied from Debezium project(1.9.8.final) to add custom jdbc properties in the jdbc url. The new
* parameter {@code jdbcProperties} in the constructor of {@link MySqlConnectionConfiguration} will
* be used to generate the jdbc url pattern, and may overwrite the default value.
*
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDefaultValueConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDefaultValueConverter.java
index 674fa68f7..4f6869633 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDefaultValueConverter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDefaultValueConverter.java
@@ -35,8 +35,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
- * Copied from Debezium project(1.9.7.final) to add BIGINT and SMALLINT to TRIM_DATA_TYPES. Remove
- * this when https://issues.redhat.com/browse/DBZ-6824 is fixed.
+ * Copied from Debezium project(1.9.8.final) to add BIGINT and SMALLINT to TRIM_DATA_TYPES. Remove
+ * this until https://issues.redhat.com/browse/DBZ-6824 is fixed in 2.3.3.Final.
*
*
Line 81 & 82: add BIGINT and SMALLINT to TRIM_DATA_TYPES.
*/
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index a35aa93da..31de3468d 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -84,7 +84,7 @@ import java.util.function.Predicate;
import static io.debezium.util.Strings.isNullOrEmpty;
/**
- * Copied from Debezium project to fix
+ * Copied from Debezium project(1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
*
*
Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java
index 686e9722c..421dae75f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java
@@ -48,8 +48,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
- * Copied from Debezium project to fix FLOAT converted to FLOAT64 type issue. See DBZ-3865,
- * DBZ-5843. Remove it when debezium version is upgraded above 2.0.0.Final.
+ * Copied from Debezium project(1.9.8.Final) to fix FLOAT converted to FLOAT64 type issue. See
+ * DBZ-3865, DBZ-5843. Remove it when debezium version is upgraded above 2.0.0.Final.
*
*
Line 240 & 246: add FLOAT type adjustment.
*
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/antlr/listener/DefaultValueParserListener.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/antlr/listener/DefaultValueParserListener.java
index d96a7a5eb..8a956123e 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/antlr/listener/DefaultValueParserListener.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/antlr/listener/DefaultValueParserListener.java
@@ -13,11 +13,11 @@ import io.debezium.relational.ColumnEditor;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Copied from Debezium project(v1.9.7.Final) to fix
+ * Copied from Debezium project(v1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1506.
*
*
Line 48~59: use the actual default string value when the sql contains COLLATE. We should
- * remove this class after we bumped a higher debezium version where the
+ * remove this class after we bumped debezium 2.0 where the
* https://issues.redhat.com/browse/DBZ-5587 has been fixed.
*/
public class DefaultValueParserListener extends MySqlParserBaseListener {
@@ -95,7 +95,10 @@ public class DefaultValueParserListener extends MySqlParserBaseListener {
}
private String unquote(String stringLiteral) {
- return stringLiteral.substring(1, stringLiteral.length() - 1);
+ if (stringLiteral != null && stringLiteral.startsWith("'") && stringLiteral.endsWith("'")) {
+ return stringLiteral.substring(1, stringLiteral.length() - 1);
+ }
+ return stringLiteral;
}
private String unquoteBinary(String stringLiteral) {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java
deleted file mode 100644
index 5fd6b07cd..000000000
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java
+++ /dev/null
@@ -1,640 +0,0 @@
-/*
- * Copyright 2023 Ververica Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.debezium.connector.oracle;
-
-import io.debezium.DebeziumException;
-import io.debezium.config.Field;
-import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter;
-import io.debezium.jdbc.JdbcConfiguration;
-import io.debezium.jdbc.JdbcConnection;
-import io.debezium.relational.Column;
-import io.debezium.relational.ColumnEditor;
-import io.debezium.relational.TableId;
-import io.debezium.relational.Tables;
-import io.debezium.relational.Tables.ColumnNameFilter;
-import io.debezium.util.Strings;
-import oracle.jdbc.OracleTypes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Clob;
-import java.sql.DatabaseMetaData;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.time.OffsetDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-/**
- * Copied from Debezium 1.9.7.Final.
- *
- *
Line 213-225: Fixed for DBZ-5738.
- */
-public class OracleConnection extends JdbcConnection {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnection.class);
-
- /** Returned by column metadata in Oracle if no scale is set; */
- private static final int ORACLE_UNSET_SCALE = -127;
-
- /** Pattern to identify system generated indices and column names. */
- private static final Pattern SYS_NC_PATTERN =
- Pattern.compile("^SYS_NC(?:_OID|_ROWINFO|[0-9][0-9][0-9][0-9][0-9])\\$$");
-
- /** Pattern to identify abstract data type indices and column names. */
- private static final Pattern ADT_INDEX_NAMES_PATTERN = Pattern.compile("^\".*\"\\.\".*\".*");
-
- /**
- * Pattern to identify a hidden column based on redefining a table with the {@code ROWID}
- * option.
- */
- private static final Pattern MROW_PATTERN = Pattern.compile("^M_ROW\\$\\$");
-
- /** A field for the raw jdbc url. This field has no default value. */
- private static final Field URL = Field.create("url", "Raw JDBC url");
-
- /** The database version. */
- private final OracleDatabaseVersion databaseVersion;
-
- private static final String QUOTED_CHARACTER = "\"";
-
- public OracleConnection(JdbcConfiguration config, Supplier classLoaderSupplier) {
- this(config, classLoaderSupplier, true);
- }
-
- public OracleConnection(
- JdbcConfiguration config,
- Supplier classLoaderSupplier,
- boolean showVersion) {
- super(
- config,
- resolveConnectionFactory(config),
- classLoaderSupplier,
- QUOTED_CHARACTER,
- QUOTED_CHARACTER);
- this.databaseVersion = resolveOracleDatabaseVersion();
- if (showVersion) {
- LOGGER.info("Database Version: {}", databaseVersion.getBanner());
- }
- }
-
- public void setSessionToPdb(String pdbName) {
- Statement statement = null;
-
- try {
- statement = connection().createStatement();
- statement.execute("alter session set container=" + pdbName);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } finally {
- if (statement != null) {
- try {
- statement.close();
- } catch (SQLException e) {
- LOGGER.error("Couldn't close statement", e);
- }
- }
- }
- }
-
- public void resetSessionToCdb() {
- Statement statement = null;
-
- try {
- statement = connection().createStatement();
- statement.execute("alter session set container=cdb$root");
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } finally {
- if (statement != null) {
- try {
- statement.close();
- } catch (SQLException e) {
- LOGGER.error("Couldn't close statement", e);
- }
- }
- }
- }
-
- public OracleDatabaseVersion getOracleVersion() {
- return databaseVersion;
- }
-
- private OracleDatabaseVersion resolveOracleDatabaseVersion() {
- String versionStr;
- try {
- try {
- // Oracle 18.1 introduced BANNER_FULL as the new column rather than BANNER
- // This column uses a different format than the legacy BANNER.
- versionStr =
- queryAndMap(
- "SELECT BANNER_FULL FROM V$VERSION WHERE BANNER_FULL LIKE 'Oracle Database%'",
- (rs) -> {
- if (rs.next()) {
- return rs.getString(1);
- }
- return null;
- });
- } catch (SQLException e) {
- // exception ignored
- if (e.getMessage().contains("ORA-00904: \"BANNER_FULL\"")) {
- LOGGER.debug(
- "BANNER_FULL column not in V$VERSION, using BANNER column as fallback");
- versionStr = null;
- } else {
- throw e;
- }
- }
-
- // For databases prior to 18.1, a SQLException will be thrown due to BANNER_FULL not
- // being a column and
- // this will cause versionStr to remain null, use fallback column BANNER for versions
- // prior to 18.1.
- if (versionStr == null) {
- versionStr =
- queryAndMap(
- "SELECT BANNER FROM V$VERSION WHERE BANNER LIKE 'Oracle Database%'",
- (rs) -> {
- if (rs.next()) {
- return rs.getString(1);
- }
- return null;
- });
- }
- } catch (SQLException e) {
- throw new RuntimeException("Failed to resolve Oracle database version", e);
- }
-
- if (versionStr == null) {
- throw new RuntimeException("Failed to resolve Oracle database version");
- }
-
- return OracleDatabaseVersion.parse(versionStr);
- }
-
- @Override
- public Set readTableNames(
- String databaseCatalog,
- String schemaNamePattern,
- String tableNamePattern,
- String[] tableTypes)
- throws SQLException {
-
- Set tableIds =
- super.readTableNames(null, schemaNamePattern, tableNamePattern, tableTypes);
-
- return tableIds.stream()
- .map(t -> new TableId(databaseCatalog, t.schema(), t.table()))
- .collect(Collectors.toSet());
- }
-
- /**
- * Retrieves all {@code TableId} in a given database catalog, filtering certain ids that should
- * be omitted from the returned set such as special spatial tables and index-organized tables.
- *
- * @param catalogName the catalog/database name
- * @return set of all table ids for existing table objects
- * @throws SQLException if a database exception occurred
- */
- protected Set getAllTableIds(String catalogName) throws SQLException {
- final String query =
- "select owner, table_name from all_tables "
- +
- // filter special spatial tables
- "where table_name NOT LIKE 'MDRT_%' "
- + "and table_name NOT LIKE 'MDRS_%' "
- + "and table_name NOT LIKE 'MDXT_%' "
- +
- // filter index-organized-tables
- "and (table_name NOT LIKE 'SYS_IOT_OVER_%' and IOT_NAME IS NULL) "
- // filter nested tables
- + "and nested = 'NO'"
- // filter parent tables of nested tables
- + "and table_name not in (select PARENT_TABLE_NAME from ALL_NESTED_TABLES)";
-
- Set tableIds = new HashSet<>();
- query(
- query,
- (rs) -> {
- while (rs.next()) {
- tableIds.add(new TableId(catalogName, rs.getString(1), rs.getString(2)));
- }
- LOGGER.trace("TableIds are: {}", tableIds);
- });
-
- return tableIds;
- }
-
- // todo replace metadata with something like this
- private ResultSet getTableColumnsInfo(String schemaNamePattern, String tableName)
- throws SQLException {
- String columnQuery =
- "select column_name, data_type, data_length, data_precision, data_scale, default_length, density, char_length from "
- + "all_tab_columns where owner like '"
- + schemaNamePattern
- + "' and table_name='"
- + tableName
- + "'";
-
- PreparedStatement statement = connection().prepareStatement(columnQuery);
- return statement.executeQuery();
- }
-
- // this is much faster, we will use it until full replacement of the metadata usage TODO
- public void readSchemaForCapturedTables(
- Tables tables,
- String databaseCatalog,
- String schemaNamePattern,
- ColumnNameFilter columnFilter,
- boolean removeTablesNotFoundInJdbc,
- Set capturedTables)
- throws SQLException {
-
- Set tableIdsBefore = new HashSet<>(tables.tableIds());
-
- DatabaseMetaData metadata = connection().getMetaData();
- Map> columnsByTable = new HashMap<>();
-
- for (TableId tableId : capturedTables) {
- try (ResultSet columnMetadata =
- metadata.getColumns(
- databaseCatalog, schemaNamePattern, tableId.table(), null)) {
- while (columnMetadata.next()) {
- // add all whitelisted columns
- readTableColumn(columnMetadata, tableId, columnFilter)
- .ifPresent(
- column -> {
- columnsByTable
- .computeIfAbsent(tableId, t -> new ArrayList<>())
- .add(column.create());
- });
- }
- }
- }
-
- // Read the metadata for the primary keys ...
- for (Map.Entry> tableEntry : columnsByTable.entrySet()) {
- // First get the primary key information, which must be done for *each* table ...
- List pkColumnNames = readPrimaryKeyNames(metadata, tableEntry.getKey());
-
- // Then define the table ...
- List columns = tableEntry.getValue();
- Collections.sort(columns);
- tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, null);
- }
-
- if (removeTablesNotFoundInJdbc) {
- // Remove any definitions for tables that were not found in the database metadata ...
- tableIdsBefore.removeAll(columnsByTable.keySet());
- tableIdsBefore.forEach(tables::removeTable);
- }
- }
-
- @Override
- protected String resolveCatalogName(String catalogName) {
- final String pdbName = config().getString("pdb.name");
- return (!Strings.isNullOrEmpty(pdbName) ? pdbName : config().getString("dbname"))
- .toUpperCase();
- }
-
- @Override
- public List readTableUniqueIndices(DatabaseMetaData metadata, TableId id)
- throws SQLException {
- return super.readTableUniqueIndices(metadata, id.toDoubleQuoted());
- }
-
- @Override
- public Optional getCurrentTimestamp() throws SQLException {
- return queryAndMap(
- "SELECT CURRENT_TIMESTAMP FROM DUAL",
- rs -> rs.next() ? Optional.of(rs.getTimestamp(1)) : Optional.empty());
- }
-
- @Override
- protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) {
- if (columnName != null) {
- return !SYS_NC_PATTERN.matcher(columnName).matches()
- && !ADT_INDEX_NAMES_PATTERN.matcher(columnName).matches()
- && !MROW_PATTERN.matcher(columnName).matches();
- }
- return false;
- }
-
- /**
- * Get the current, most recent system change number.
- *
- * @return the current system change number
- * @throws SQLException if an exception occurred
- * @throws IllegalStateException if the query does not return at least one row
- */
- public Scn getCurrentScn() throws SQLException {
- return queryAndMap(
- "SELECT CURRENT_SCN FROM V$DATABASE",
- (rs) -> {
- if (rs.next()) {
- return Scn.valueOf(rs.getString(1));
- }
- throw new IllegalStateException("Could not get SCN");
- });
- }
-
- /**
- * Generate a given table's DDL metadata.
- *
- * @param tableId table identifier, should never be {@code null}
- * @return generated DDL
- * @throws SQLException if an exception occurred obtaining the DDL metadata
- * @throws NonRelationalTableException the table is not a relational table
- */
- public String getTableMetadataDdl(TableId tableId)
- throws SQLException, NonRelationalTableException {
- try {
- // This table contains all available objects that are considered relational & object
- // based.
- // By querying for TABLE_TYPE is null, we are explicitly confirming what if an entry
- // exists
- // that the table is in-fact a relational table and if the result set is empty, the
- // object
- // is another type, likely an object-based table, in which case we cannot generate DDL.
- final String tableType =
- "SELECT COUNT(1) FROM ALL_ALL_TABLES WHERE OWNER='"
- + tableId.schema()
- + "' AND TABLE_NAME='"
- + tableId.table()
- + "' AND TABLE_TYPE IS NULL";
- if (queryAndMap(tableType, rs -> rs.next() ? rs.getInt(1) : 0) == 0) {
- throw new NonRelationalTableException(
- "Table " + tableId + " is not a relational table");
- }
-
- // The storage and segment attributes aren't necessary
- executeWithoutCommitting(
- "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;");
- executeWithoutCommitting(
- "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;");
- // In case DDL is returned as multiple DDL statements, this allows the parser to parse
- // each separately.
- // This is only critical during streaming as during snapshot the table structure is
- // built from JDBC driver queries.
- executeWithoutCommitting(
- "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SQLTERMINATOR', true); end;");
- return queryAndMap(
- "SELECT dbms_metadata.get_ddl('TABLE','"
- + tableId.table()
- + "','"
- + tableId.schema()
- + "') FROM DUAL",
- rs -> {
- if (!rs.next()) {
- throw new DebeziumException(
- "Could not get DDL metadata for table: " + tableId);
- }
-
- Object res = rs.getObject(1);
- return ((Clob) res).getSubString(1, (int) ((Clob) res).length());
- });
- } finally {
- executeWithoutCommitting(
- "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'DEFAULT'); end;");
- }
- }
-
- /**
- * Get the current connection's session statistic by name.
- *
- * @param name the name of the statistic to be fetched, must not be {@code null}
- * @return the session statistic value, never {@code null}
- * @throws SQLException if an exception occurred obtaining the session statistic value
- */
- public Long getSessionStatisticByName(String name) throws SQLException {
- return queryAndMap(
- "SELECT VALUE FROM v$statname n, v$mystat m WHERE n.name='"
- + name
- + "' AND n.statistic#=m.statistic#",
- rs -> rs.next() ? rs.getLong(1) : 0L);
- }
-
- /**
- * Returns whether the given table exists or not.
- *
- * @param tableName table name, should not be {@code null}
- * @return true if the table exists, false if it does not
- * @throws SQLException if a database exception occurred
- */
- public boolean isTableExists(String tableName) throws SQLException {
- return queryAndMap(
- "SELECT COUNT(1) FROM USER_TABLES WHERE TABLE_NAME = '" + tableName + "'",
- rs -> rs.next() && rs.getLong(1) > 0);
- }
-
- public boolean isTableExists(TableId tableId) throws SQLException {
- return queryAndMap(
- "SELECT COUNT(1) FROM ALL_TABLES WHERE OWNER = '"
- + tableId.schema()
- + "' AND TABLE_NAME = '"
- + tableId.table()
- + "'",
- rs -> rs.next() && rs.getLong(1) > 0);
- }
-
- /**
- * Returns whether the given table is empty or not.
- *
- * @param tableName table name, should not be {@code null}
- * @return true if the table has no records, false otherwise
- * @throws SQLException if a database exception occurred
- */
- public boolean isTableEmpty(String tableName) throws SQLException {
- return getRowCount(tableName) == 0L;
- }
-
- /**
- * Returns the number of rows in a given table.
- *
- * @param tableName table name, should not be {@code null}
- * @return the number of rows
- * @throws SQLException if a database exception occurred
- */
- public long getRowCount(String tableName) throws SQLException {
- return queryAndMap(
- "SELECT COUNT(1) FROM " + tableName,
- rs -> {
- if (rs.next()) {
- return rs.getLong(1);
- }
- return 0L;
- });
- }
-
- public T singleOptionalValue(String query, ResultSetExtractor extractor)
- throws SQLException {
- return queryAndMap(query, rs -> rs.next() ? extractor.apply(rs) : null);
- }
-
- @Override
- public String buildSelectWithRowLimits(
- TableId tableId,
- int limit,
- String projection,
- Optional condition,
- String orderBy) {
- final TableId table = new TableId(null, tableId.schema(), tableId.table());
- final StringBuilder sql = new StringBuilder("SELECT ");
- sql.append(projection).append(" FROM ");
- sql.append(quotedTableIdString(table));
- if (condition.isPresent()) {
- sql.append(" WHERE ").append(condition.get());
- }
- if (getOracleVersion().getMajor() < 12) {
- sql.insert(0, " SELECT * FROM (")
- .append(" ORDER BY ")
- .append(orderBy)
- .append(")")
- .append(" WHERE ROWNUM <=")
- .append(limit);
- } else {
- sql.append(" ORDER BY ")
- .append(orderBy)
- .append(" FETCH NEXT ")
- .append(limit)
- .append(" ROWS ONLY");
- }
- return sql.toString();
- }
-
- public static String connectionString(JdbcConfiguration config) {
- return config.getString(URL) != null
- ? config.getString(URL)
- : ConnectorAdapter.parse(config.getString("connection.adapter")).getConnectionUrl();
- }
-
- private static ConnectionFactory resolveConnectionFactory(JdbcConfiguration config) {
- return JdbcConnection.patternBasedFactory(connectionString(config));
- }
-
- /**
- * Determine whether the Oracle server has the archive log enabled.
- *
- * @return {@code true} if the server's {@code LOG_MODE} is set to {@code ARCHIVELOG}, or {@code
- * false} otherwise
- */
- protected boolean isArchiveLogMode() {
- try {
- final String mode =
- queryAndMap(
- "SELECT LOG_MODE FROM V$DATABASE",
- rs -> rs.next() ? rs.getString(1) : "");
- LOGGER.debug("LOG_MODE={}", mode);
- return "ARCHIVELOG".equalsIgnoreCase(mode);
- } catch (SQLException e) {
- throw new DebeziumException(
- "Unexpected error while connecting to Oracle and looking at LOG_MODE mode: ",
- e);
- }
- }
-
- /**
- * Resolve a system change number to a timestamp, return value is in database timezone.
- *
- *
The SCN to TIMESTAMP mapping is only retained for the duration of the flashback query
- * area. This means that eventually the mapping between these values are no longer kept by
- * Oracle and making a call with a SCN value that has aged out will result in an ORA-08181
- * error. This function explicitly checks for this use case and if a ORA-08181 error is thrown,
- * it is therefore treated as if a value does not exist returning an empty optional value.
- *
- * @param scn the system change number, must not be {@code null}
- * @return an optional timestamp when the system change number occurred
- * @throws SQLException if a database exception occurred
- */
- public Optional getScnToTimestamp(Scn scn) throws SQLException {
- try {
- return queryAndMap(
- "SELECT scn_to_timestamp('" + scn + "') FROM DUAL",
- rs ->
- rs.next()
- ? Optional.of(rs.getObject(1, OffsetDateTime.class))
- : Optional.empty());
- } catch (SQLException e) {
- if (e.getMessage().startsWith("ORA-08181")) {
- // ORA-08181 specified number is not a valid system change number
- // This happens when the SCN provided is outside the flashback area range
- // This should be treated as a value is not available rather than an error
- return Optional.empty();
- }
- // Any other SQLException should be thrown
- throw e;
- }
- }
-
- @Override
- protected ColumnEditor overrideColumn(ColumnEditor column) {
- // This allows the column state to be overridden before default-value resolution so that the
- // output of the default value is within the same precision as that of the column values.
- if (OracleTypes.TIMESTAMP == column.jdbcType()) {
- column.length(column.scale().orElse(Column.UNSET_INT_VALUE)).scale(null);
- } else if (OracleTypes.NUMBER == column.jdbcType()) {
- column.scale().filter(s -> s == ORACLE_UNSET_SCALE).ifPresent(s -> column.scale(null));
- }
- return column;
- }
-
- @Override
- protected Map> getColumnsDetails(
- String databaseCatalog,
- String schemaNamePattern,
- String tableName,
- Tables.TableFilter tableFilter,
- ColumnNameFilter columnFilter,
- DatabaseMetaData metadata,
- Set viewIds)
- throws SQLException {
- // The Oracle JDBC driver expects that if the table name contains a "/" character that
- // the table name is pre-escaped prior to the JDBC driver call, or else it throws an
- // exception about the character sequence being improperly escaped.
- if (tableName != null && tableName.contains("/")) {
- tableName = tableName.replace("/", "//");
- }
- return super.getColumnsDetails(
- databaseCatalog,
- schemaNamePattern,
- tableName,
- tableFilter,
- columnFilter,
- metadata,
- viewIds);
- }
-
- /**
- * An exception that indicates the operation failed because the table is not a relational table.
- */
- public static class NonRelationalTableException extends DebeziumException {
- public NonRelationalTableException(String message) {
- super(message);
- }
- }
-}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java
index 53387e3cd..59a5cdd66 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java
@@ -58,7 +58,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
/**
- * Copied from Debezium 1.9.7.
+ * Copied from Debezium 1.9.8.Final.
*
*
Line 356: Replace < condition with <= to be able to catch ongoing transactions during snapshot
* if current SCN points to START/INSERT/DELETE/UPDATE event.
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java
index 4a743dfa4..4969c1abd 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java
@@ -22,7 +22,12 @@ import org.apache.kafka.connect.header.ConnectHeaders;
import java.util.Optional;
-/** Emits change records based on an event read from Oracle LogMiner. */
+/**
+ * Copied from Debezium 1.9.8.Final. Emits change records based on an event read from Oracle
+ * LogMiner.
+ *
+ *
This class add RowId and overrides the emit methods to put rowId in the header.
+ */
public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter