[debezium] Upgrade to Debezium 1.9.8.Final(#3033) (#3034)

pull/2985/head
Hongshun Wang 1 year ago committed by GitHub
parent 10e542c3ff
commit 7ea197242f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -42,7 +42,7 @@ import java.util.Map;
import static com.ververica.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn; import static com.ververica.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;
/** Copied from {@link AlterTableParserListener} in Debezium 1.9.7.Final. */ /** Copied from {@link AlterTableParserListener} in Debezium 1.9.8.Final. */
public class CustomAlterTableParserListener extends MySqlParserBaseListener { public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private static final int STARTING_INDEX = 1; private static final int STARTING_INDEX = 1;

@ -48,7 +48,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
/** /**
* Copied from {@link MySqlAntlrDdlParserListener} in Debezium 1.9.7.final. * Copied from {@link MySqlAntlrDdlParserListener} in Debezium 1.9.8.final.
* *
* <p>This listener's constructor will use some modified listener. * <p>This listener's constructor will use some modified listener.
*/ */

@ -22,7 +22,7 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
/** /**
* Copied from Debezium 1.9.7.Final. * Copied from Debezium 1.9.8.Final.
* *
* <p>Base class for {@link ChangeRecordEmitter} implementations based on a relational database. * <p>Base class for {@link ChangeRecordEmitter} implementations based on a relational database.
* *

@ -38,7 +38,7 @@ import java.util.OptionalLong;
import java.util.Properties; import java.util.Properties;
/** /**
* Copied from Debezium project(1.9.7.final) to add custom jdbc properties in the jdbc url. The new * Copied from Debezium project(1.9.8.final) to add custom jdbc properties in the jdbc url. The new
* parameter {@code jdbcProperties} in the constructor of {@link MySqlConnectionConfiguration} will * parameter {@code jdbcProperties} in the constructor of {@link MySqlConnectionConfiguration} will
* be used to generate the jdbc url pattern, and may overwrite the default value. * be used to generate the jdbc url pattern, and may overwrite the default value.
* *

@ -35,8 +35,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
* Copied from Debezium project(1.9.7.final) to add BIGINT and SMALLINT to TRIM_DATA_TYPES. Remove * Copied from Debezium project(1.9.8.final) to add BIGINT and SMALLINT to TRIM_DATA_TYPES. Remove
* this when https://issues.redhat.com/browse/DBZ-6824 is fixed. * this until https://issues.redhat.com/browse/DBZ-6824 is fixed in 2.3.3.Final.
* *
* <p>Line 81 & 82: add BIGINT and SMALLINT to TRIM_DATA_TYPES. * <p>Line 81 & 82: add BIGINT and SMALLINT to TRIM_DATA_TYPES.
*/ */

@ -84,7 +84,7 @@ import java.util.function.Predicate;
import static io.debezium.util.Strings.isNullOrEmpty; import static io.debezium.util.Strings.isNullOrEmpty;
/** /**
* Copied from Debezium project to fix * Copied from Debezium project(1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1944. * https://github.com/ververica/flink-cdc-connectors/issues/1944.
* *
* <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously * <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously

@ -48,8 +48,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
* Copied from Debezium project to fix FLOAT converted to FLOAT64 type issue. See DBZ-3865, * Copied from Debezium project(1.9.8.Final) to fix FLOAT converted to FLOAT64 type issue. See
* DBZ-5843. Remove it when debezium version is upgraded above 2.0.0.Final. * DBZ-3865, DBZ-5843. Remove it when debezium version is upgraded above 2.0.0.Final.
* *
* <p>Line 240 & 246: add FLOAT type adjustment. * <p>Line 240 & 246: add FLOAT type adjustment.
* *

@ -13,11 +13,11 @@ import io.debezium.relational.ColumnEditor;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
* Copied from Debezium project(v1.9.7.Final) to fix * Copied from Debezium project(v1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1506. * https://github.com/ververica/flink-cdc-connectors/issues/1506.
* *
* <p>Line 48~59: use the actual default string value when the sql contains COLLATE. We should * <p>Line 48~59: use the actual default string value when the sql contains COLLATE. We should
* remove this class after we bumped a higher debezium version where the * remove this class after we bumped debezium 2.0 where the
* https://issues.redhat.com/browse/DBZ-5587 has been fixed. * https://issues.redhat.com/browse/DBZ-5587 has been fixed.
*/ */
public class DefaultValueParserListener extends MySqlParserBaseListener { public class DefaultValueParserListener extends MySqlParserBaseListener {
@ -95,7 +95,10 @@ public class DefaultValueParserListener extends MySqlParserBaseListener {
} }
private String unquote(String stringLiteral) { private String unquote(String stringLiteral) {
return stringLiteral.substring(1, stringLiteral.length() - 1); if (stringLiteral != null && stringLiteral.startsWith("'") && stringLiteral.endsWith("'")) {
return stringLiteral.substring(1, stringLiteral.length() - 1);
}
return stringLiteral;
} }
private String unquoteBinary(String stringLiteral) { private String unquoteBinary(String stringLiteral) {

@ -1,640 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.debezium.connector.oracle;
import io.debezium.DebeziumException;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.util.Strings;
import oracle.jdbc.OracleTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Clob;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Copied from Debezium 1.9.7.Final.
*
* <p>Line 213-225: Fixed for DBZ-5738.
*/
public class OracleConnection extends JdbcConnection {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnection.class);
/** Returned by column metadata in Oracle if no scale is set; */
private static final int ORACLE_UNSET_SCALE = -127;
/** Pattern to identify system generated indices and column names. */
private static final Pattern SYS_NC_PATTERN =
Pattern.compile("^SYS_NC(?:_OID|_ROWINFO|[0-9][0-9][0-9][0-9][0-9])\\$$");
/** Pattern to identify abstract data type indices and column names. */
private static final Pattern ADT_INDEX_NAMES_PATTERN = Pattern.compile("^\".*\"\\.\".*\".*");
/**
* Pattern to identify a hidden column based on redefining a table with the {@code ROWID}
* option.
*/
private static final Pattern MROW_PATTERN = Pattern.compile("^M_ROW\\$\\$");
/** A field for the raw jdbc url. This field has no default value. */
private static final Field URL = Field.create("url", "Raw JDBC url");
/** The database version. */
private final OracleDatabaseVersion databaseVersion;
private static final String QUOTED_CHARACTER = "\"";
public OracleConnection(JdbcConfiguration config, Supplier<ClassLoader> classLoaderSupplier) {
this(config, classLoaderSupplier, true);
}
public OracleConnection(
JdbcConfiguration config,
Supplier<ClassLoader> classLoaderSupplier,
boolean showVersion) {
super(
config,
resolveConnectionFactory(config),
classLoaderSupplier,
QUOTED_CHARACTER,
QUOTED_CHARACTER);
this.databaseVersion = resolveOracleDatabaseVersion();
if (showVersion) {
LOGGER.info("Database Version: {}", databaseVersion.getBanner());
}
}
public void setSessionToPdb(String pdbName) {
Statement statement = null;
try {
statement = connection().createStatement();
statement.execute("alter session set container=" + pdbName);
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
LOGGER.error("Couldn't close statement", e);
}
}
}
}
public void resetSessionToCdb() {
Statement statement = null;
try {
statement = connection().createStatement();
statement.execute("alter session set container=cdb$root");
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
LOGGER.error("Couldn't close statement", e);
}
}
}
}
public OracleDatabaseVersion getOracleVersion() {
return databaseVersion;
}
private OracleDatabaseVersion resolveOracleDatabaseVersion() {
String versionStr;
try {
try {
// Oracle 18.1 introduced BANNER_FULL as the new column rather than BANNER
// This column uses a different format than the legacy BANNER.
versionStr =
queryAndMap(
"SELECT BANNER_FULL FROM V$VERSION WHERE BANNER_FULL LIKE 'Oracle Database%'",
(rs) -> {
if (rs.next()) {
return rs.getString(1);
}
return null;
});
} catch (SQLException e) {
// exception ignored
if (e.getMessage().contains("ORA-00904: \"BANNER_FULL\"")) {
LOGGER.debug(
"BANNER_FULL column not in V$VERSION, using BANNER column as fallback");
versionStr = null;
} else {
throw e;
}
}
// For databases prior to 18.1, a SQLException will be thrown due to BANNER_FULL not
// being a column and
// this will cause versionStr to remain null, use fallback column BANNER for versions
// prior to 18.1.
if (versionStr == null) {
versionStr =
queryAndMap(
"SELECT BANNER FROM V$VERSION WHERE BANNER LIKE 'Oracle Database%'",
(rs) -> {
if (rs.next()) {
return rs.getString(1);
}
return null;
});
}
} catch (SQLException e) {
throw new RuntimeException("Failed to resolve Oracle database version", e);
}
if (versionStr == null) {
throw new RuntimeException("Failed to resolve Oracle database version");
}
return OracleDatabaseVersion.parse(versionStr);
}
@Override
public Set<TableId> readTableNames(
String databaseCatalog,
String schemaNamePattern,
String tableNamePattern,
String[] tableTypes)
throws SQLException {
Set<TableId> tableIds =
super.readTableNames(null, schemaNamePattern, tableNamePattern, tableTypes);
return tableIds.stream()
.map(t -> new TableId(databaseCatalog, t.schema(), t.table()))
.collect(Collectors.toSet());
}
/**
* Retrieves all {@code TableId} in a given database catalog, filtering certain ids that should
* be omitted from the returned set such as special spatial tables and index-organized tables.
*
* @param catalogName the catalog/database name
* @return set of all table ids for existing table objects
* @throws SQLException if a database exception occurred
*/
protected Set<TableId> getAllTableIds(String catalogName) throws SQLException {
final String query =
"select owner, table_name from all_tables "
+
// filter special spatial tables
"where table_name NOT LIKE 'MDRT_%' "
+ "and table_name NOT LIKE 'MDRS_%' "
+ "and table_name NOT LIKE 'MDXT_%' "
+
// filter index-organized-tables
"and (table_name NOT LIKE 'SYS_IOT_OVER_%' and IOT_NAME IS NULL) "
// filter nested tables
+ "and nested = 'NO'"
// filter parent tables of nested tables
+ "and table_name not in (select PARENT_TABLE_NAME from ALL_NESTED_TABLES)";
Set<TableId> tableIds = new HashSet<>();
query(
query,
(rs) -> {
while (rs.next()) {
tableIds.add(new TableId(catalogName, rs.getString(1), rs.getString(2)));
}
LOGGER.trace("TableIds are: {}", tableIds);
});
return tableIds;
}
// todo replace metadata with something like this
private ResultSet getTableColumnsInfo(String schemaNamePattern, String tableName)
throws SQLException {
String columnQuery =
"select column_name, data_type, data_length, data_precision, data_scale, default_length, density, char_length from "
+ "all_tab_columns where owner like '"
+ schemaNamePattern
+ "' and table_name='"
+ tableName
+ "'";
PreparedStatement statement = connection().prepareStatement(columnQuery);
return statement.executeQuery();
}
// this is much faster, we will use it until full replacement of the metadata usage TODO
public void readSchemaForCapturedTables(
Tables tables,
String databaseCatalog,
String schemaNamePattern,
ColumnNameFilter columnFilter,
boolean removeTablesNotFoundInJdbc,
Set<TableId> capturedTables)
throws SQLException {
Set<TableId> tableIdsBefore = new HashSet<>(tables.tableIds());
DatabaseMetaData metadata = connection().getMetaData();
Map<TableId, List<Column>> columnsByTable = new HashMap<>();
for (TableId tableId : capturedTables) {
try (ResultSet columnMetadata =
metadata.getColumns(
databaseCatalog, schemaNamePattern, tableId.table(), null)) {
while (columnMetadata.next()) {
// add all whitelisted columns
readTableColumn(columnMetadata, tableId, columnFilter)
.ifPresent(
column -> {
columnsByTable
.computeIfAbsent(tableId, t -> new ArrayList<>())
.add(column.create());
});
}
}
}
// Read the metadata for the primary keys ...
for (Map.Entry<TableId, List<Column>> tableEntry : columnsByTable.entrySet()) {
// First get the primary key information, which must be done for *each* table ...
List<String> pkColumnNames = readPrimaryKeyNames(metadata, tableEntry.getKey());
// Then define the table ...
List<Column> columns = tableEntry.getValue();
Collections.sort(columns);
tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, null);
}
if (removeTablesNotFoundInJdbc) {
// Remove any definitions for tables that were not found in the database metadata ...
tableIdsBefore.removeAll(columnsByTable.keySet());
tableIdsBefore.forEach(tables::removeTable);
}
}
@Override
protected String resolveCatalogName(String catalogName) {
final String pdbName = config().getString("pdb.name");
return (!Strings.isNullOrEmpty(pdbName) ? pdbName : config().getString("dbname"))
.toUpperCase();
}
@Override
public List<String> readTableUniqueIndices(DatabaseMetaData metadata, TableId id)
throws SQLException {
return super.readTableUniqueIndices(metadata, id.toDoubleQuoted());
}
@Override
public Optional<Timestamp> getCurrentTimestamp() throws SQLException {
return queryAndMap(
"SELECT CURRENT_TIMESTAMP FROM DUAL",
rs -> rs.next() ? Optional.of(rs.getTimestamp(1)) : Optional.empty());
}
@Override
protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) {
if (columnName != null) {
return !SYS_NC_PATTERN.matcher(columnName).matches()
&& !ADT_INDEX_NAMES_PATTERN.matcher(columnName).matches()
&& !MROW_PATTERN.matcher(columnName).matches();
}
return false;
}
/**
* Get the current, most recent system change number.
*
* @return the current system change number
* @throws SQLException if an exception occurred
* @throws IllegalStateException if the query does not return at least one row
*/
public Scn getCurrentScn() throws SQLException {
return queryAndMap(
"SELECT CURRENT_SCN FROM V$DATABASE",
(rs) -> {
if (rs.next()) {
return Scn.valueOf(rs.getString(1));
}
throw new IllegalStateException("Could not get SCN");
});
}
/**
* Generate a given table's DDL metadata.
*
* @param tableId table identifier, should never be {@code null}
* @return generated DDL
* @throws SQLException if an exception occurred obtaining the DDL metadata
* @throws NonRelationalTableException the table is not a relational table
*/
public String getTableMetadataDdl(TableId tableId)
throws SQLException, NonRelationalTableException {
try {
// This table contains all available objects that are considered relational & object
// based.
// By querying for TABLE_TYPE is null, we are explicitly confirming what if an entry
// exists
// that the table is in-fact a relational table and if the result set is empty, the
// object
// is another type, likely an object-based table, in which case we cannot generate DDL.
final String tableType =
"SELECT COUNT(1) FROM ALL_ALL_TABLES WHERE OWNER='"
+ tableId.schema()
+ "' AND TABLE_NAME='"
+ tableId.table()
+ "' AND TABLE_TYPE IS NULL";
if (queryAndMap(tableType, rs -> rs.next() ? rs.getInt(1) : 0) == 0) {
throw new NonRelationalTableException(
"Table " + tableId + " is not a relational table");
}
// The storage and segment attributes aren't necessary
executeWithoutCommitting(
"begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;");
executeWithoutCommitting(
"begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;");
// In case DDL is returned as multiple DDL statements, this allows the parser to parse
// each separately.
// This is only critical during streaming as during snapshot the table structure is
// built from JDBC driver queries.
executeWithoutCommitting(
"begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SQLTERMINATOR', true); end;");
return queryAndMap(
"SELECT dbms_metadata.get_ddl('TABLE','"
+ tableId.table()
+ "','"
+ tableId.schema()
+ "') FROM DUAL",
rs -> {
if (!rs.next()) {
throw new DebeziumException(
"Could not get DDL metadata for table: " + tableId);
}
Object res = rs.getObject(1);
return ((Clob) res).getSubString(1, (int) ((Clob) res).length());
});
} finally {
executeWithoutCommitting(
"begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'DEFAULT'); end;");
}
}
/**
* Get the current connection's session statistic by name.
*
* @param name the name of the statistic to be fetched, must not be {@code null}
* @return the session statistic value, never {@code null}
* @throws SQLException if an exception occurred obtaining the session statistic value
*/
public Long getSessionStatisticByName(String name) throws SQLException {
return queryAndMap(
"SELECT VALUE FROM v$statname n, v$mystat m WHERE n.name='"
+ name
+ "' AND n.statistic#=m.statistic#",
rs -> rs.next() ? rs.getLong(1) : 0L);
}
/**
* Returns whether the given table exists or not.
*
* @param tableName table name, should not be {@code null}
* @return true if the table exists, false if it does not
* @throws SQLException if a database exception occurred
*/
public boolean isTableExists(String tableName) throws SQLException {
return queryAndMap(
"SELECT COUNT(1) FROM USER_TABLES WHERE TABLE_NAME = '" + tableName + "'",
rs -> rs.next() && rs.getLong(1) > 0);
}
public boolean isTableExists(TableId tableId) throws SQLException {
return queryAndMap(
"SELECT COUNT(1) FROM ALL_TABLES WHERE OWNER = '"
+ tableId.schema()
+ "' AND TABLE_NAME = '"
+ tableId.table()
+ "'",
rs -> rs.next() && rs.getLong(1) > 0);
}
/**
* Returns whether the given table is empty or not.
*
* @param tableName table name, should not be {@code null}
* @return true if the table has no records, false otherwise
* @throws SQLException if a database exception occurred
*/
public boolean isTableEmpty(String tableName) throws SQLException {
return getRowCount(tableName) == 0L;
}
/**
* Returns the number of rows in a given table.
*
* @param tableName table name, should not be {@code null}
* @return the number of rows
* @throws SQLException if a database exception occurred
*/
public long getRowCount(String tableName) throws SQLException {
return queryAndMap(
"SELECT COUNT(1) FROM " + tableName,
rs -> {
if (rs.next()) {
return rs.getLong(1);
}
return 0L;
});
}
public <T> T singleOptionalValue(String query, ResultSetExtractor<T> extractor)
throws SQLException {
return queryAndMap(query, rs -> rs.next() ? extractor.apply(rs) : null);
}
@Override
public String buildSelectWithRowLimits(
TableId tableId,
int limit,
String projection,
Optional<String> condition,
String orderBy) {
final TableId table = new TableId(null, tableId.schema(), tableId.table());
final StringBuilder sql = new StringBuilder("SELECT ");
sql.append(projection).append(" FROM ");
sql.append(quotedTableIdString(table));
if (condition.isPresent()) {
sql.append(" WHERE ").append(condition.get());
}
if (getOracleVersion().getMajor() < 12) {
sql.insert(0, " SELECT * FROM (")
.append(" ORDER BY ")
.append(orderBy)
.append(")")
.append(" WHERE ROWNUM <=")
.append(limit);
} else {
sql.append(" ORDER BY ")
.append(orderBy)
.append(" FETCH NEXT ")
.append(limit)
.append(" ROWS ONLY");
}
return sql.toString();
}
public static String connectionString(JdbcConfiguration config) {
return config.getString(URL) != null
? config.getString(URL)
: ConnectorAdapter.parse(config.getString("connection.adapter")).getConnectionUrl();
}
private static ConnectionFactory resolveConnectionFactory(JdbcConfiguration config) {
return JdbcConnection.patternBasedFactory(connectionString(config));
}
/**
* Determine whether the Oracle server has the archive log enabled.
*
* @return {@code true} if the server's {@code LOG_MODE} is set to {@code ARCHIVELOG}, or {@code
* false} otherwise
*/
protected boolean isArchiveLogMode() {
try {
final String mode =
queryAndMap(
"SELECT LOG_MODE FROM V$DATABASE",
rs -> rs.next() ? rs.getString(1) : "");
LOGGER.debug("LOG_MODE={}", mode);
return "ARCHIVELOG".equalsIgnoreCase(mode);
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to Oracle and looking at LOG_MODE mode: ",
e);
}
}
/**
* Resolve a system change number to a timestamp, return value is in database timezone.
*
* <p>The SCN to TIMESTAMP mapping is only retained for the duration of the flashback query
* area. This means that eventually the mapping between these values are no longer kept by
* Oracle and making a call with a SCN value that has aged out will result in an ORA-08181
* error. This function explicitly checks for this use case and if a ORA-08181 error is thrown,
* it is therefore treated as if a value does not exist returning an empty optional value.
*
* @param scn the system change number, must not be {@code null}
* @return an optional timestamp when the system change number occurred
* @throws SQLException if a database exception occurred
*/
public Optional<OffsetDateTime> getScnToTimestamp(Scn scn) throws SQLException {
try {
return queryAndMap(
"SELECT scn_to_timestamp('" + scn + "') FROM DUAL",
rs ->
rs.next()
? Optional.of(rs.getObject(1, OffsetDateTime.class))
: Optional.empty());
} catch (SQLException e) {
if (e.getMessage().startsWith("ORA-08181")) {
// ORA-08181 specified number is not a valid system change number
// This happens when the SCN provided is outside the flashback area range
// This should be treated as a value is not available rather than an error
return Optional.empty();
}
// Any other SQLException should be thrown
throw e;
}
}
@Override
protected ColumnEditor overrideColumn(ColumnEditor column) {
// This allows the column state to be overridden before default-value resolution so that the
// output of the default value is within the same precision as that of the column values.
if (OracleTypes.TIMESTAMP == column.jdbcType()) {
column.length(column.scale().orElse(Column.UNSET_INT_VALUE)).scale(null);
} else if (OracleTypes.NUMBER == column.jdbcType()) {
column.scale().filter(s -> s == ORACLE_UNSET_SCALE).ifPresent(s -> column.scale(null));
}
return column;
}
@Override
protected Map<TableId, List<Column>> getColumnsDetails(
String databaseCatalog,
String schemaNamePattern,
String tableName,
Tables.TableFilter tableFilter,
ColumnNameFilter columnFilter,
DatabaseMetaData metadata,
Set<TableId> viewIds)
throws SQLException {
// The Oracle JDBC driver expects that if the table name contains a "/" character that
// the table name is pre-escaped prior to the JDBC driver call, or else it throws an
// exception about the character sequence being improperly escaped.
if (tableName != null && tableName.contains("/")) {
tableName = tableName.replace("/", "//");
}
return super.getColumnsDetails(
databaseCatalog,
schemaNamePattern,
tableName,
tableFilter,
columnFilter,
metadata,
viewIds);
}
/**
* An exception that indicates the operation failed because the table is not a relational table.
*/
public static class NonRelationalTableException extends DebeziumException {
public NonRelationalTableException(String message) {
super(message);
}
}
}

@ -58,7 +58,7 @@ import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* Copied from Debezium 1.9.7. * Copied from Debezium 1.9.8.Final.
* *
* <p>Line 356: Replace < condition with <= to be able to catch ongoing transactions during snapshot * <p>Line 356: Replace < condition with <= to be able to catch ongoing transactions during snapshot
* if current SCN points to START/INSERT/DELETE/UPDATE event. * if current SCN points to START/INSERT/DELETE/UPDATE event.

@ -22,7 +22,12 @@ import org.apache.kafka.connect.header.ConnectHeaders;
import java.util.Optional; import java.util.Optional;
/** Emits change records based on an event read from Oracle LogMiner. */ /**
* Copied from Debezium 1.9.8.Final. Emits change records based on an event read from Oracle
* LogMiner.
*
* <p>This class add RowId and overrides the emit methods to put rowId in the header.
*/
public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter<Object> { public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter<Object> {
private final Operation operation; private final Operation operation;

@ -63,7 +63,7 @@ import static io.debezium.connector.oracle.logminer.LogMinerHelper.logError;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.setLogFilesForMining; import static io.debezium.connector.oracle.logminer.LogMinerHelper.setLogFilesForMining;
/** /**
* Copied from Debezium 1.9.7. A {@link StreamingChangeEventSource} based on Oracle's LogMiner * Copied from Debezium 1.9.8.Final. A {@link StreamingChangeEventSource} based on Oracle's LogMiner
* utility. The event handler loop is executed in a separate executor. * utility. The event handler loop is executed in a separate executor.
* *
* <p>Diff: Make createProcessor method as protected to produce a LogMinerEventProcessor with * <p>Diff: Make createProcessor method as protected to produce a LogMinerEventProcessor with

@ -54,7 +54,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
* Copied from Debezium 1.9.7.Final. * Copied from Debezium 1.9.8.Final.
* *
* <p>An abstract implementation of {@link LogMinerEventProcessor} that all processors should * <p>An abstract implementation of {@link LogMinerEventProcessor} that all processors should
* extend. * extend.

@ -35,7 +35,7 @@ import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
* Copied from Debezium 1.9.7. * Copied from Debezium 1.9.8.final
* *
* <p>Line 150~151 : set the ending lsn for the replication connection. * <p>Line 150~151 : set the ending lsn for the replication connection.
*/ */

@ -11,10 +11,11 @@ import org.postgresql.replication.LogSequenceNumber;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* Copied from Debezium 1.9.7 without changes due to the NoSuchMethodError, caused by the fact that * Copied from Debezium 1.9.8.final without changes due to the NoSuchMethodError, caused by the fact
* current Debezium release java version is 11, so we need to compile this file by java 8 compiler. * that current Debezium release java version is 11, so we need to compile this file by java 8
* <a href="https://www.morling.dev/blog/bytebuffer-and-the-dreaded-nosuchmethoderror/">More * compiler. <a
* info</a>. Abstraction of PostgreSQL log sequence number, adapted from {@link LogSequenceNumber}. * href="https://www.morling.dev/blog/bytebuffer-and-the-dreaded-nosuchmethoderror/">More info</a>.
* Abstraction of PostgreSQL log sequence number, adapted from {@link LogSequenceNumber}.
* *
* <p>Line 32: add NO_STOPPING_LSN * <p>Line 32: add NO_STOPPING_LSN
*/ */

@ -48,6 +48,7 @@ import java.sql.Statement;
import java.time.Duration; import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -55,7 +56,7 @@ import java.util.regex.Pattern;
* {@link JdbcConnection} connection extension used for connecting to Postgres instances. * {@link JdbcConnection} connection extension used for connecting to Postgres instances.
* *
* @author Horia Chiorean * @author Horia Chiorean
* <p>Copied from Debezium 1.9.2-Final with two additional methods: * <p>Copied from Debezium 1.9.8-Final with three additional methods:
* <ul> * <ul>
* <li>Constructor PostgresConnection( Configuration config, PostgresValueConverterBuilder * <li>Constructor PostgresConnection( Configuration config, PostgresValueConverterBuilder
* valueConverterBuilder, ConnectionFactory factory) to allow passing a custom * valueConverterBuilder, ConnectionFactory factory) to allow passing a custom
@ -549,7 +550,7 @@ public class PostgresConnection extends JdbcConnection {
public Long currentTransactionId() throws SQLException { public Long currentTransactionId() throws SQLException {
AtomicLong txId = new AtomicLong(0); AtomicLong txId = new AtomicLong(0);
query( query(
"select * from txid_current()", "select (case pg_is_in_recovery() when 't' then 0 else txid_current() end) AS pg_current_txid",
rs -> { rs -> {
if (rs.next()) { if (rs.next()) {
txId.compareAndSet(0, rs.getLong(1)); txId.compareAndSet(0, rs.getLong(1));
@ -570,7 +571,7 @@ public class PostgresConnection extends JdbcConnection {
int majorVersion = connection().getMetaData().getDatabaseMajorVersion(); int majorVersion = connection().getMetaData().getDatabaseMajorVersion();
query( query(
majorVersion >= 10 majorVersion >= 10
? "select * from pg_current_wal_lsn()" ? "select (case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) AS pg_current_wal_lsn"
: "select * from pg_current_xlog_location()", : "select * from pg_current_xlog_location()",
rs -> { rs -> {
if (!rs.next()) { if (!rs.next()) {
@ -847,6 +848,17 @@ public class PostgresConnection extends JdbcConnection {
return false; return false;
} }
/**
* Retrieves all {@code TableId}s in a given database catalog, including partitioned tables.
*
* @param catalogName the catalog/database name
* @return set of all table ids for existing table objects
* @throws SQLException if a database exception occurred
*/
public Set<TableId> getAllTableIds(String catalogName) throws SQLException {
return readTableNames(catalogName, null, null, new String[] {"TABLE", "PARTITIONED TABLE"});
}
@FunctionalInterface @FunctionalInterface
public interface PostgresValueConverterBuilder { public interface PostgresValueConverterBuilder {
PostgresValueConverter build(TypeRegistry registry); PostgresValueConverter build(TypeRegistry registry);

@ -49,14 +49,14 @@ import java.util.stream.Collectors;
import static java.lang.Math.toIntExact; import static java.lang.Math.toIntExact;
/** /**
* Copied from Debezium 1.9.7. * Copied from Debezium 1.9.8.Final
* *
* <p>The {@link ReplicationConnection} created from {@code createReplicationStream} will hang when * <p>The {@link ReplicationConnection} created from {@code createReplicationStream} will hang when
* the wal logs only contain the keepAliveMessage. Support to set an ending Lsn to stop hanging. * the wal logs only contain the keepAliveMessage. Support to set an ending Lsn to stop hanging.
* *
* <p>Line 82, 694~695 : add endingPos and its setter. * <p>Line 83, 711~713 : add endingPos and its setter.
* *
* <p>Line 554~559, 578~583: ReplicationStream from {@code createReplicationStream} will stop when * <p>Line 571~576, 595~600: ReplicationStream from {@code createReplicationStream} will stop when
* endingPos reached. * endingPos reached.
*/ */
public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection { public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection {
@ -72,6 +72,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
private final PostgresConnectorConfig connectorConfig; private final PostgresConnectorConfig connectorConfig;
private final Duration statusUpdateInterval; private final Duration statusUpdateInterval;
private final MessageDecoder messageDecoder; private final MessageDecoder messageDecoder;
private final PostgresConnection jdbcConnection;
private final TypeRegistry typeRegistry; private final TypeRegistry typeRegistry;
private final Properties streamParams; private final Properties streamParams;
@ -98,7 +99,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
* @param statusUpdateInterval the interval at which the replication connection should * @param statusUpdateInterval the interval at which the replication connection should
* periodically send status * periodically send status
* @param doSnapshot whether the connector is doing snapshot * @param doSnapshot whether the connector is doing snapshot
* @param jdbcConnection general POstgreSQL JDBC connection * @param jdbcConnection general PostgreSQL JDBC connection
* @param typeRegistry registry with PostgreSQL types * @param typeRegistry registry with PostgreSQL types
* @param streamParams additional parameters to pass to the replication stream * @param streamParams additional parameters to pass to the replication stream
* @param schema the schema; must not be null * @param schema the schema; must not be null
@ -136,6 +137,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
this.statusUpdateInterval = statusUpdateInterval; this.statusUpdateInterval = statusUpdateInterval;
this.messageDecoder = this.messageDecoder =
plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection); plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection);
this.jdbcConnection = jdbcConnection;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.streamParams = streamParams; this.streamParams = streamParams;
this.slotCreationInfo = null; this.slotCreationInfo = null;
@ -204,44 +206,24 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
stmt.execute(createPublicationStmt); stmt.execute(createPublicationStmt);
break; break;
case FILTERED: case FILTERED:
try { createOrUpdatePublicationModeFilterted(
Set<TableId> tablesToCapture = determineCapturedTables(); tableFilterString, stmt, false);
tableFilterString =
tablesToCapture.stream()
.map(TableId::toDoubleQuotedString)
.collect(Collectors.joining(", "));
if (tableFilterString.isEmpty()) {
throw new DebeziumException(
String.format(
"No table filters found for filtered publication %s",
publicationName));
}
createPublicationStmt =
String.format(
"CREATE PUBLICATION %s FOR TABLE %s;",
publicationName, tableFilterString);
LOGGER.info(
"Creating Publication with statement '{}'",
createPublicationStmt);
// Publication doesn't exist, create it but restrict to the
// tableFilter.
stmt.execute(createPublicationStmt);
} catch (Exception e) {
throw new ConnectException(
String.format(
"Unable to create filtered publication %s for %s",
publicationName, tableFilterString),
e);
}
break; break;
} }
} else { } else {
LOGGER.trace( switch (publicationAutocreateMode) {
"A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server " case FILTERED:
+ "and will be used by the plugin", createOrUpdatePublicationModeFilterted(
publicationName, tableFilterString, stmt, true);
plugin, break;
database()); default:
LOGGER.trace(
"A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server "
+ "and will be used by the plugin",
publicationName,
plugin,
database());
}
} }
} }
} }
@ -253,11 +235,46 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
} }
} }
private void createOrUpdatePublicationModeFilterted(
String tableFilterString, Statement stmt, boolean isUpdate) {
String createOrUpdatePublicationStmt;
try {
Set<TableId> tablesToCapture = determineCapturedTables();
tableFilterString =
tablesToCapture.stream()
.map(TableId::toDoubleQuotedString)
.collect(Collectors.joining(", "));
if (tableFilterString.isEmpty()) {
throw new DebeziumException(
String.format(
"No table filters found for filtered publication %s",
publicationName));
}
createOrUpdatePublicationStmt =
isUpdate
? String.format(
"ALTER PUBLICATION %s SET TABLE %s;",
publicationName, tableFilterString)
: String.format(
"CREATE PUBLICATION %s FOR TABLE %s;",
publicationName, tableFilterString);
LOGGER.info(
isUpdate
? "Updating Publication with statement '{}'"
: "Creating Publication with statement '{}'",
createOrUpdatePublicationStmt);
stmt.execute(createOrUpdatePublicationStmt);
} catch (Exception e) {
throw new ConnectException(
String.format(
"Unable to %s filtered publication %s for %s",
isUpdate ? "update" : "create", publicationName, tableFilterString),
e);
}
}
private Set<TableId> determineCapturedTables() throws Exception { private Set<TableId> determineCapturedTables() throws Exception {
Set<TableId> allTableIds = Set<TableId> allTableIds = jdbcConnection.getAllTableIds(connectorConfig.databaseName());
this.connect()
.readTableNames(
pgConnection().getCatalog(), null, null, new String[] {"TABLE"});
Set<TableId> capturedTables = new HashSet<>(); Set<TableId> capturedTables = new HashSet<>();

@ -37,9 +37,9 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* Copied from Debezium project(1.9.7.final) to add method {@link * Copied from Debezium project(1.9.8.final) to add method {@link
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented * SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented
* {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition, * {@link SqlServerStreamingChangeEventSource#execute(ChangeEventSourceContext, SqlServerPartition,
* SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data * SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data
* capture functionality. A main loop polls database DDL change and change data tables and turns * capture functionality. A main loop polls database DDL change and change data tables and turns
* them into change events. * them into change events.

@ -74,7 +74,7 @@ under the License.
<flink.version>1.18.0</flink.version> <flink.version>1.18.0</flink.version>
<flink.major.version>1.18</flink.major.version> <flink.major.version>1.18</flink.major.version>
<flink.shaded.version>17.0</flink.shaded.version> <flink.shaded.version>17.0</flink.shaded.version>
<debezium.version>1.9.7.Final</debezium.version> <debezium.version>1.9.8.Final</debezium.version>
<tikv.version>3.2.0</tikv.version> <tikv.version>3.2.0</tikv.version>
<geometry.version>2.2.0</geometry.version> <geometry.version>2.2.0</geometry.version>
<testcontainers.version>1.18.3</testcontainers.version> <testcontainers.version>1.18.3</testcontainers.version>

Loading…
Cancel
Save