From 94fa8b347f34220941de722c031cd62bf000488a Mon Sep 17 00:00:00 2001 From: molsion Date: Mon, 11 Jul 2022 22:06:22 +0800 Subject: [PATCH] [oracle] Introduce SCAN_INCREMENTAL_SNAPSHOT_ENABLED option for OracleTableSourceFactory --- .../cdc/connectors/tests/OracleE2eITCase.java | 99 +------ .../oracle/source/OracleDialect.java | 1 - .../oracle/source/OracleSourceBuilder.java | 8 +- .../source/config/OracleSourceConfig.java | 5 - .../config/OracleSourceConfigFactory.java | 1 - .../OracleSourceOptions.java} | 24 +- .../events/LatestFinishedSplitsSizeEvent.java | 50 ---- .../source/meta/split/OracleRedoLogSplit.java | 151 ----------- .../meta/split/OracleSnapshotSplit.java | 147 ----------- .../split/OracleSourceSplitSerializer.java | 198 -------------- .../oracle/source/meta/split/OracleSplit.java | 81 ------ .../reader/fetch/OracleScanFetchTask.java | 2 +- .../reader/fetch/OracleStreamFetchTask.java | 2 +- .../source/table/OracleTableSource.java | 241 ------------------ .../table/OracleTableSourceFactory.java | 174 ------------- .../oracle/source/utils/SerializerUtils.java | 125 --------- .../oracle/table/OracleTableSource.java | 80 +++--- .../table/OracleTableSourceFactory.java | 121 ++++----- .../org.apache.flink.table.factories.Factory | 1 - .../oracle/source/OracleSourceITCase.java | 4 +- .../table/OracleTableSourceFactoryTest.java | 75 ++---- 21 files changed, 142 insertions(+), 1448 deletions(-) rename flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/{meta/events/LatestFinishedSplitsSizeRequestEvent.java => config/OracleSourceOptions.java} (55%) delete mode 100644 flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/events/LatestFinishedSplitsSizeEvent.java delete mode 100644 flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleRedoLogSplit.java delete mode 100644 flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSnapshotSplit.java delete mode 100644 flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSourceSplitSerializer.java delete mode 100644 flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSplit.java delete mode 100644 flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/table/OracleTableSource.java delete mode 100644 flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/table/OracleTableSourceFactory.java delete mode 100644 flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/SerializerUtils.java diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java index 71b7cb480..d2cbc8ccd 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java @@ -91,109 +91,14 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment { " 'connector' = 'oracle-cdc',", " 'hostname' = '" + INTER_CONTAINER_ORACLE_ALIAS + "',", " 'port' = '" + ORACLE_PORT + "',", - " 'username' = '" + ORACLE_TEST_USER + "',", - " 'password' = '" + ORACLE_TEST_PASSWORD + "',", - " 'database-name' = 'XE',", - " 'schema-name' = 'debezium',", - " 'debezium.log.mining.strategy' = 'online_catalog',", - " 'debezium.log.mining.continuous.mine' = 'true',", - " 'table-name' = 'products'", - ");", - "CREATE TABLE products_sink (", - " `id` INT NOT NULL,", - " name STRING,", - " description STRING,", - " weight DECIMAL(10,3),", - " primary key (`id`) not enforced", - ") WITH (", - " 'connector' = 'jdbc',", - String.format( - " 'url' = 'jdbc:mysql://%s:3306/%s',", - INTER_CONTAINER_MYSQL_ALIAS, - mysqlInventoryDatabase.getDatabaseName()), - " 'table-name' = 'products_sink',", - " 'username' = '" + MYSQL_TEST_USER + "',", - " 'password' = '" + MYSQL_TEST_PASSWORD + "'", - ");", - "INSERT INTO products_sink", - "SELECT * FROM products_source;"); - - submitSQLJob(sqlLines, oracleCdcJar, jdbcJar, mysqlDriverJar); - waitUntilJobRunning(Duration.ofSeconds(30)); - - // generate binlogs - Class.forName(ORACLE_DRIVER_CLASS); - // we need to set this property, otherwise Azure Pipeline will complain - // "ORA-01882: timezone region not found" error when building the Oracle JDBC connection - // see https://stackoverflow.com/a/9177263/4915129 - System.setProperty("oracle.jdbc.timezoneAsRegion", "false"); - try (Connection conn = getOracleJdbcConnection(); - Statement statement = conn.createStatement()) { - statement.execute( - "UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106"); - statement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107"); - statement.execute( - "INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)"); - statement.execute( - "INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)"); - statement.execute( - "UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111"); - statement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112"); - statement.execute("DELETE FROM debezium.products WHERE ID=112"); - } catch (SQLException e) { - LOG.error("Update table for CDC failed.", e); - throw e; - } - - // assert final results - String mysqlUrl = - String.format( - "jdbc:mysql://%s:%s/%s", - MYSQL.getHost(), - MYSQL.getDatabasePort(), - mysqlInventoryDatabase.getDatabaseName()); - JdbcProxy proxy = - new JdbcProxy(mysqlUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS); - List expectResult = - Arrays.asList( - "101,scooter,Small 2-wheel scooter,3.14", - "102,car battery,12V car battery,8.1", - "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8", - "104,hammer,12oz carpenters hammer,0.75", - "105,hammer,14oz carpenters hammer,0.875", - "106,hammer,18oz carpenter hammer,1.0", - "107,rocks,box of assorted rocks,5.1", - "108,jacket,water resistent black wind breaker,0.1", - "109,spare tire,24 inch spare tire,22.2", - "111,jacket,new water resistent white wind breaker,0.5"); - proxy.checkResultWithTimeout( - expectResult, - "products_sink", - new String[] {"id", "name", "description", "weight"}, - 150000L); - } - - @Test - public void testOracleCDCNew() throws Exception { - List sqlLines = - Arrays.asList( - "CREATE TABLE products_source (", - " ID INT NOT NULL,", - " NAME STRING,", - " DESCRIPTION STRING,", - " WEIGHT DECIMAL(10,3),", - " primary key (`ID`) not enforced", - ") WITH (", - " 'connector' = 'oracle-cdc-new',", - " 'hostname' = '" + INTER_CONTAINER_ORACLE_ALIAS + "',", - " 'port' = '" + ORACLE_PORT + "',", " 'username' = '" + ORACLE_SYSTEM_USER + "',", " 'password' = '" + ORACLE_SYSTEM_PASSWORD + "',", " 'database-name' = 'XE',", " 'schema-name' = 'DEBEZIUM',", " 'debezium.log.mining.strategy' = 'online_catalog',", " 'debezium.log.mining.continuous.mine' = 'true',", - " 'table-name' = 'PRODUCTS'", + " 'table-name' = 'PRODUCTS',", + " 'scan.incremental.snapshot.chunk.size' = '4'", ");", "CREATE TABLE products_sink (", " `id` INT NOT NULL,", diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java index a4333a7f9..8fd2444a6 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java @@ -91,7 +91,6 @@ public class OracleDialect implements JdbcDataSourceDialect { public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) { return OracleConnectionUtils.createOracleConnection( sourceConfig.getDbzConnectorConfig().getJdbcConfig()); - // return JdbcDataSourceDialect.super.openJdbcConnection(sourceConfig); } @Override diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java index 3b97fbde4..47c30b296 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java @@ -18,7 +18,7 @@ package com.ververica.cdc.connectors.oracle.source; -import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource; @@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; *

Check the Java docs of each individual method to learn more about the settings to build a * {@link OracleIncrementalSource}. */ -@Experimental +@Internal public class OracleSourceBuilder { private final OracleSourceConfigFactory configFactory = new OracleSourceConfigFactory(); private RedoLogOffsetFactory offsetFactory; @@ -219,5 +219,9 @@ public class OracleSourceBuilder { OracleDialect dataSourceDialect) { super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect); } + + public static OracleSourceBuilder builder() { + return new OracleSourceBuilder<>(); + } } } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java index 1f524b05e..e42d8be53 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java @@ -36,13 +36,9 @@ public class OracleSourceConfig extends JdbcSourceConfig { private static final long serialVersionUID = 1L; - private List schemaList; - private transient OracleConnectorConfig dbzOracleConfig; - public OracleSourceConfig( StartupOptions startupOptions, List databaseList, - List schemaList, List tableList, int splitSize, int splitMetaGroupSize, @@ -82,7 +78,6 @@ public class OracleSourceConfig extends JdbcSourceConfig { connectTimeout, connectMaxRetries, connectionPoolSize); - this.schemaList = schemaList; } @Override diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java index ce5060b11..703a5823b 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java @@ -88,7 +88,6 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory { return new OracleSourceConfig( startupOptions, databaseList, - schemaList, tableList, splitSize, splitMetaGroupSize, diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/events/LatestFinishedSplitsSizeRequestEvent.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceOptions.java similarity index 55% rename from flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/events/LatestFinishedSplitsSizeRequestEvent.java rename to flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceOptions.java index 2131445b8..a55c071b6 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/events/LatestFinishedSplitsSizeRequestEvent.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceOptions.java @@ -16,20 +16,20 @@ * limitations under the License. */ -package com.ververica.cdc.connectors.oracle.source.meta.events; +package com.ververica.cdc.connectors.oracle.source.config; -import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; -import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator; -import com.ververica.cdc.connectors.base.source.reader.JdbcIncrementalSourceReader; +import com.ververica.cdc.connectors.base.options.JdbcSourceOptions; +import com.ververica.cdc.connectors.oracle.OracleSource; -/** - * The {@link SourceEvent} that {@link JdbcIncrementalSourceReader} sends to {@link - * IncrementalSourceEnumerator} to ask the latest finished snapshot splits size. - */ -public class LatestFinishedSplitsSizeRequestEvent implements SourceEvent { - - private static final long serialVersionUID = 1L; +/** Configurations for {@link OracleSource}. */ +public class OracleSourceOptions extends JdbcSourceOptions { - public LatestFinishedSplitsSizeRequestEvent() {} + public static final ConfigOption SCHEMA_NAME = + ConfigOptions.key("schema-name") + .stringType() + .noDefaultValue() + .withDescription("Schema name of the Oracle database to monitor."); } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/events/LatestFinishedSplitsSizeEvent.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/events/LatestFinishedSplitsSizeEvent.java deleted file mode 100644 index 04f141fd8..000000000 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/events/LatestFinishedSplitsSizeEvent.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oracle.source.meta.events; - -import org.apache.flink.api.connector.source.SourceEvent; - -import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource; -import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator; - -/** - * The {@link SourceEvent} that {@link IncrementalSourceEnumerator} sends to {@link - * JdbcIncrementalSource} to pass the latest finished snapshot splits size. - */ -public class LatestFinishedSplitsSizeEvent implements SourceEvent { - - private static final long serialVersionUID = 1L; - private final int latestFinishedSplitsSize; - - public LatestFinishedSplitsSizeEvent(int latestFinishedSplitsSize) { - this.latestFinishedSplitsSize = latestFinishedSplitsSize; - } - - public int getLatestFinishedSplitsSize() { - return latestFinishedSplitsSize; - } - - @Override - public String toString() { - return "LatestFinishedSplitsSizeEvent{" - + "latestFinishedSplitsSize=" - + latestFinishedSplitsSize - + '}'; - } -} diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleRedoLogSplit.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleRedoLogSplit.java deleted file mode 100644 index a8a7f86fc..000000000 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleRedoLogSplit.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oracle.source.meta.split; - -import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; -import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; -import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges; - -import javax.annotation.Nullable; - -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** The split to describe the redo log of Oracle table(s). */ -public class OracleRedoLogSplit extends OracleSplit { - - private final RedoLogOffset startingOffset; - private final RedoLogOffset endingOffset; - private final List finishedSnapshotSplitInfos; - private final Map tableSchemas; - private final int totalFinishedSplitSize; - private final boolean isSuspended; - @Nullable transient byte[] serializedFormCache; - - public OracleRedoLogSplit( - String splitId, - RedoLogOffset startingOffset, - RedoLogOffset endingOffset, - List finishedSnapshotSplitInfos, - Map tableSchemas, - int totalFinishedSplitSize, - boolean isSuspended) { - super(splitId); - this.startingOffset = startingOffset; - this.endingOffset = endingOffset; - this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos; - this.tableSchemas = tableSchemas; - this.totalFinishedSplitSize = totalFinishedSplitSize; - this.isSuspended = isSuspended; - } - - public OracleRedoLogSplit( - String splitId, - RedoLogOffset startingOffset, - RedoLogOffset endingOffset, - List finishedSnapshotSplitInfos, - Map tableSchemas, - int totalFinishedSplitSize) { - super(splitId); - this.startingOffset = startingOffset; - this.endingOffset = endingOffset; - this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos; - this.tableSchemas = tableSchemas; - this.totalFinishedSplitSize = totalFinishedSplitSize; - this.isSuspended = false; - } - - public RedoLogOffset getStartingOffset() { - return startingOffset; - } - - public RedoLogOffset getEndingOffset() { - return endingOffset; - } - - public List getFinishedSnapshotSplitInfos() { - return finishedSnapshotSplitInfos; - } - - @Override - public Map getTableSchemas() { - return tableSchemas; - } - - public int getTotalFinishedSplitSize() { - return totalFinishedSplitSize; - } - - public boolean isSuspended() { - return isSuspended; - } - - public boolean isCompletedSplit() { - return totalFinishedSplitSize == finishedSnapshotSplitInfos.size(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof OracleRedoLogSplit)) { - return false; - } - if (!super.equals(o)) { - return false; - } - OracleRedoLogSplit that = (OracleRedoLogSplit) o; - return totalFinishedSplitSize == that.totalFinishedSplitSize - && isSuspended == that.isSuspended - && Objects.equals(startingOffset, that.startingOffset) - && Objects.equals(endingOffset, that.endingOffset) - && Objects.equals(finishedSnapshotSplitInfos, that.finishedSnapshotSplitInfos) - && Objects.equals(tableSchemas, that.tableSchemas); - } - - @Override - public int hashCode() { - return Objects.hash( - super.hashCode(), - startingOffset, - endingOffset, - finishedSnapshotSplitInfos, - tableSchemas, - totalFinishedSplitSize, - isSuspended); - } - - @Override - public String toString() { - return "OracleRedoLogSplit{" - + "splitId='" - + splitId - + '\'' - + ", offset=" - + startingOffset - + ", endOffset=" - + endingOffset - + ", isSuspended=" - + isSuspended - + '}'; - } -} diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSnapshotSplit.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSnapshotSplit.java deleted file mode 100644 index b2eadeb7c..000000000 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSnapshotSplit.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oracle.source.meta.split; - -import org.apache.flink.table.types.logical.RowType; - -import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; -import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges; - -import javax.annotation.Nullable; - -import java.util.Arrays; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -/** The split to describe a split of an Oracle table snapshot. */ -public class OracleSnapshotSplit extends OracleSplit { - - private final TableId tableId; - private final RowType splitKeyType; - private final Map tableSchemas; - - @Nullable private final Object[] splitStart; - @Nullable private final Object[] splitEnd; - /** The high watermark is not bull when the split read finished. */ - @Nullable private final RedoLogOffset highWatermark; - - @Nullable transient byte[] serializedFormCache; - - public OracleSnapshotSplit( - String splitId, - TableId tableId, - RowType splitKeyType, - Map tableSchemas, - @Nullable Object[] splitStart, - @Nullable Object[] splitEnd, - @Nullable RedoLogOffset highWatermark) { - super(splitId); - this.tableId = tableId; - this.splitKeyType = splitKeyType; - this.tableSchemas = tableSchemas; - this.splitStart = splitStart; - this.splitEnd = splitEnd; - this.highWatermark = highWatermark; - } - - @Override - public Map getTableSchemas() { - return tableSchemas; - } - - public TableId getTableId() { - return tableId; - } - - @Nullable - public Object[] getSplitStart() { - return splitStart; - } - - @Nullable - public Object[] getSplitEnd() { - return splitEnd; - } - - @Nullable - public RedoLogOffset getHighWatermark() { - return highWatermark; - } - - public boolean isSnapshotReadFinished() { - return highWatermark != null; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - OracleSnapshotSplit that = (OracleSnapshotSplit) o; - return Objects.equals(tableId, that.tableId) - && Objects.equals(splitKeyType, that.splitKeyType) - && Arrays.equals(splitStart, that.splitStart) - && Arrays.equals(splitEnd, that.splitEnd) - && Objects.equals(highWatermark, that.highWatermark); - } - - public RowType getSplitKeyType() { - return splitKeyType; - } - - @Override - public int hashCode() { - int result = Objects.hash(super.hashCode(), tableId, splitKeyType, highWatermark); - result = 31 * result + Arrays.hashCode(splitStart); - result = 31 * result + Arrays.hashCode(splitEnd); - result = 31 * result + Arrays.hashCode(serializedFormCache); - return result; - } - - @Override - public String toString() { - String splitKeyTypeSummary = - splitKeyType.getFields().stream() - .map(RowType.RowField::asSummaryString) - .collect(Collectors.joining(",", "[", "]")); - return "OracleSnapshotSplit{" - + "tableId=" - + tableId - + ", splitId='" - + splitId - + '\'' - + ", splitKeyType=" - + splitKeyTypeSummary - + ", splitStart=" - + Arrays.toString(splitStart) - + ", splitEnd=" - + Arrays.toString(splitEnd) - + ", highWatermark=" - + highWatermark - + '}'; - } -} diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSourceSplitSerializer.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSourceSplitSerializer.java deleted file mode 100644 index 5dcede6b2..000000000 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSourceSplitSerializer.java +++ /dev/null @@ -1,198 +0,0 @@ -/// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// package com.ververica.cdc.connectors.oracle.source.meta.split; -// -// import org.apache.flink.core.memory.DataInputDeserializer; -// import org.apache.flink.core.memory.DataOutputSerializer; -// import org.apache.flink.table.types.logical.RowType; -// import org.apache.flink.table.types.logical.utils.LogicalTypeParser; -// -// import com.ververica.cdc.connectors.base.source.meta.offset.Offset; -// import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory; -// import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; -// import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit; -// import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; -// import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer; -// import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; -// import com.ververica.cdc.connectors.base.utils.SerializerUtils; -// import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory; -// import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer; -// import io.debezium.document.Document; -// import io.debezium.document.DocumentReader; -// import io.debezium.relational.TableId; -// import io.debezium.relational.history.TableChanges; -// -// import java.io.IOException; -// import java.nio.charset.StandardCharsets; -// import java.util.ArrayList; -// import java.util.HashMap; -// import java.util.List; -// import java.util.Map; -// -/// ** An Oracle serializer for the {@link SourceSplitBase}. */ -// public class OracleSourceSplitSerializer extends SourceSplitSerializer { -// -// private static final int SNAPSHOT_SPLIT_FLAG = 1; -// private static final int REDOLOG_SPLIT_FLAG = 2; -// RedoLogOffsetFactory offsetFactory; -// -// public OracleSourceSplitSerializer(RedoLogOffsetFactory offsetFactory) { -// this.offsetFactory = offsetFactory; -// } -// -// @Override -// public OffsetFactory getOffsetFactory() { -// return offsetFactory; -// } -// -// @Override -// public Offset readOffsetPosition(int offsetVersion, DataInputDeserializer in) -// throws IOException { -// return super.readOffsetPosition(offsetVersion, in); -// } -// -// @Override -// public Offset readOffsetPosition(DataInputDeserializer in) throws IOException { -// return super.readOffsetPosition(in); -// } -// -// @Override -// public void writeOffsetPosition(Offset offset, DataOutputSerializer out) throws IOException { -// super.writeOffsetPosition(offset, out); -// } -// -// @Override -// public OffsetDeserializer createOffsetDeserializer() { -// return super.createOffsetDeserializer(); -// } -// -// @Override -// public FinishedSnapshotSplitInfo deserialize(byte[] serialized) { -// return super.deserialize(serialized); -// } -// -// @Override -// public byte[] serialize(SourceSplitBase split) throws IOException { -// return super.serialize(split); -// } -// -// @Override -// public SourceSplitBase deserialize(int version, byte[] serialized) throws IOException { -// return super.deserialize(version, serialized); -// } -// -// @Override -// public SourceSplitBase deserializeSplit(int version, byte[] serialized) throws IOException { -// final DataInputDeserializer in = new DataInputDeserializer(serialized); -// -// int splitKind = in.readInt(); -// if (splitKind == SNAPSHOT_SPLIT_FLAG) { -// TableId tableId = TableId.parse(in.readUTF(), false); -// String splitId = in.readUTF(); -// RowType splitKeyType = (RowType) LogicalTypeParser.parse(in.readUTF()); -// Object[] splitBoundaryStart = SerializerUtils.serializedStringToRow(in.readUTF()); -// Object[] splitBoundaryEnd = SerializerUtils.serializedStringToRow(in.readUTF()); -// Offset highWatermark = readOffsetPosition(version, in); -// Map tableSchemas = readTableSchemas(version, in); -// -// return new SnapshotSplit( -// tableId, -// splitId, -// splitKeyType, -// splitBoundaryStart, -// splitBoundaryEnd, -// highWatermark, -// tableSchemas); -// } else if (splitKind == REDOLOG_SPLIT_FLAG) { -// String splitId = in.readUTF(); -// // skip split Key Type -// in.readUTF(); -// Offset startingOffset = readOffsetPosition(version, in); -// Offset endingOffset = readOffsetPosition(version, in); -// List finishedSplitsInfo = -// readFinishedSplitsInfo(version, in); -// Map tableChangeMap = readTableSchemas(version, in); -// int totalFinishedSplitSize = finishedSplitsInfo.size(); -// if (version == 3) { -// totalFinishedSplitSize = in.readInt(); -// } -// in.releaseArrays(); -// return new StreamSplit( -// splitId, -// startingOffset, -// endingOffset, -// finishedSplitsInfo, -// tableChangeMap, -// totalFinishedSplitSize); -// } else { -// throw new IOException("Unknown split kind: " + splitKind); -// } -// } -// -// private List readFinishedSplitsInfo( -// int version, DataInputDeserializer in) throws IOException { -// List finishedSplitsInfo = new ArrayList<>(); -// final int size = in.readInt(); -// for (int i = 0; i < size; i++) { -// TableId tableId = TableId.parse(in.readUTF(), false); -// String splitId = in.readUTF(); -// Object[] splitStart = SerializerUtils.serializedStringToRow(in.readUTF()); -// Object[] splitEnd = SerializerUtils.serializedStringToRow(in.readUTF()); -// OffsetFactory offsetFactory = -// (OffsetFactory) SerializerUtils.serializedStringToObject(in.readUTF()); -// Offset highWatermark = readOffsetPosition(version, in); -// -// finishedSplitsInfo.add( -// new FinishedSnapshotSplitInfo( -// tableId, splitId, splitStart, splitEnd, highWatermark, -// offsetFactory)); -// } -// return finishedSplitsInfo; -// } -// -// private static Map readTableSchemas( -// int version, DataInputDeserializer in) throws IOException { -// DocumentReader documentReader = DocumentReader.defaultReader(); -// Map tableSchemas = new HashMap<>(); -// final int size = in.readInt(); -// for (int i = 0; i < size; i++) { -// TableId tableId = TableId.parse(in.readUTF(), false); -// final String tableChangeStr; -// switch (version) { -// case 1: -// tableChangeStr = in.readUTF(); -// break; -// case 2: -// case 3: -// final int len = in.readInt(); -// final byte[] bytes = new byte[len]; -// in.read(bytes); -// tableChangeStr = new String(bytes, StandardCharsets.UTF_8); -// break; -// default: -// throw new IOException("Unknown version: " + version); -// } -// Document document = documentReader.read(tableChangeStr); -// TableChanges.TableChange tableChange = -// FlinkJsonTableChangeSerializer.fromDocument(document, false); -// tableSchemas.put(tableId, tableChange); -// } -// return tableSchemas; -// } -// } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSplit.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSplit.java deleted file mode 100644 index 8fb458802..000000000 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/meta/split/OracleSplit.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oracle.source.meta.split; - -import org.apache.flink.api.connector.source.SourceSplit; - -import io.debezium.relational.TableId; -import io.debezium.relational.history.TableChanges; - -import java.util.Map; -import java.util.Objects; - -/** The split of table comes from a Table that splits by primary key. */ -public abstract class OracleSplit implements SourceSplit { - - protected final String splitId; - - public OracleSplit(String splitId) { - this.splitId = splitId; - } - - @Override - public String splitId() { - return splitId; - } - - /** Checks whether this split is a snapshot split. */ - public final boolean isSnapshotSplit() { - return getClass() == OracleSnapshotSplit.class; - } - - /** Checks whether this split is a redo log split. */ - public final boolean isBinlogSplit() { - return getClass() == OracleRedoLogSplit.class; - } - - /** Casts this split into a {@link OracleSnapshotSplit}. */ - public final OracleSnapshotSplit asSnapshotSplit() { - return (OracleSnapshotSplit) this; - } - - /** Casts this split into a {@link OracleRedoLogSplit}. */ - public final OracleRedoLogSplit asRedoLogSplit() { - return (OracleRedoLogSplit) this; - } - - public abstract Map getTableSchemas(); - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - OracleSplit that = (OracleSplit) o; - return Objects.equals(splitId, that.splitId); - } - - @Override - public int hashCode() { - return Objects.hash(splitId); - } -} diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index a761c88bc..251f9d91a 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -193,7 +193,7 @@ public class OracleScanFetchTask implements FetchTask { sourcePartition, backFillBinlogSplit, backFillBinlogSplit.getEndingOffset(), - WatermarkKind.BINLOG_END); + WatermarkKind.END); } /** A wrapped task to fetch snapshot split of table. */ diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java index 55cf8a35d..3a47ad083 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java @@ -143,7 +143,7 @@ public class OracleStreamFetchTask implements FetchTask { offsetContext.getPartition(), redoLogSplit, currentRedoLogOffset, - JdbcSourceEventDispatcher.WatermarkKind.BINLOG_END); + JdbcSourceEventDispatcher.WatermarkKind.END); } catch (InterruptedException e) { LOG.error("Send signal event error.", e); errorHandler.setProducerThrowable( diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/table/OracleTableSource.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/table/OracleTableSource.java deleted file mode 100644 index 7a1e88d15..000000000 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/table/OracleTableSource.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oracle.source.table; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.SourceProvider; -import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; - -import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource; -import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder; -import com.ververica.cdc.connectors.oracle.table.OracleDeserializationConverterFactory; -import com.ververica.cdc.connectors.oracle.table.OracleReadableMetaData; -import com.ververica.cdc.connectors.oracle.table.StartupOptions; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import com.ververica.cdc.debezium.table.MetadataConverter; -import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A {@link DynamicTableSource} that describes how to create a Oracle binlog from a logical - * description. - */ -public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata { - - private final ResolvedSchema physicalSchema; - private final int port; - private final String hostname; - private final String database; - private final String username; - private final String password; - private final String tableName; - private final String schemaName; - private final Properties dbzProperties; - private final StartupOptions startupOptions; - - // -------------------------------------------------------------------------------------------- - // Mutable attributes - // -------------------------------------------------------------------------------------------- - - /** Data type that describes the final output of the source. */ - protected DataType producedDataType; - - /** Metadata that is appended at the end of a physical source row. */ - protected List metadataKeys; - - public OracleTableSource( - ResolvedSchema physicalSchema, - int port, - String hostname, - String database, - String tableName, - String schemaName, - String username, - String password, - Properties dbzProperties, - StartupOptions startupOptions) { - this.physicalSchema = physicalSchema; - this.port = port; - this.hostname = checkNotNull(hostname); - this.database = checkNotNull(database); - this.tableName = checkNotNull(tableName); - this.schemaName = checkNotNull(schemaName); - this.username = checkNotNull(username); - this.password = checkNotNull(password); - this.dbzProperties = dbzProperties; - this.startupOptions = startupOptions; - this.producedDataType = physicalSchema.toPhysicalRowDataType(); - this.metadataKeys = Collections.emptyList(); - } - - @Override - public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.UPDATE_BEFORE) - .addContainedKind(RowKind.UPDATE_AFTER) - .addContainedKind(RowKind.DELETE) - .build(); - } - - @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { - - RowType physicalDataType = - (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); - MetadataConverter[] metadataConverters = getMetadataConverters(); - TypeInformation typeInfo = scanContext.createTypeInformation(producedDataType); - - DebeziumDeserializationSchema deserializer = - RowDataDebeziumDeserializeSchema.newBuilder() - .setPhysicalRowType(physicalDataType) - .setMetadataConverters(metadataConverters) - .setResultTypeInfo(typeInfo) - .setUserDefinedConverterFactory( - OracleDeserializationConverterFactory.instance()) - .build(); - JdbcIncrementalSource oracleChangeEventSource = - new OracleSourceBuilder() - .hostname(hostname) - .port(port) - .databaseList(database) - .schemaList(schemaName) - .tableList(schemaName + "." + tableName) - .username(username) - .password(password) - .deserializer(deserializer) - .includeSchemaChanges(true) - .debeziumProperties(dbzProperties) - .build(); - - return SourceProvider.of(oracleChangeEventSource); - } - - private MetadataConverter[] getMetadataConverters() { - if (metadataKeys.isEmpty()) { - return new MetadataConverter[0]; - } - - return metadataKeys.stream() - .map( - key -> - Stream.of(OracleReadableMetaData.values()) - .filter(m -> m.getKey().equals(key)) - .findFirst() - .orElseThrow(IllegalStateException::new)) - .map(OracleReadableMetaData::getConverter) - .toArray(MetadataConverter[]::new); - } - - @Override - public DynamicTableSource copy() { - OracleTableSource source = - new OracleTableSource( - physicalSchema, - port, - hostname, - database, - tableName, - schemaName, - username, - password, - dbzProperties, - startupOptions); - source.metadataKeys = metadataKeys; - source.producedDataType = producedDataType; - return source; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - OracleTableSource that = (OracleTableSource) o; - return port == that.port - && Objects.equals(physicalSchema, that.physicalSchema) - && Objects.equals(hostname, that.hostname) - && Objects.equals(database, that.database) - && Objects.equals(username, that.username) - && Objects.equals(password, that.password) - && Objects.equals(tableName, that.tableName) - && Objects.equals(schemaName, that.schemaName) - && Objects.equals(dbzProperties, that.dbzProperties) - && Objects.equals(startupOptions, that.startupOptions) - && Objects.equals(producedDataType, that.producedDataType) - && Objects.equals(metadataKeys, that.metadataKeys); - } - - @Override - public int hashCode() { - return Objects.hash( - physicalSchema, - port, - hostname, - database, - username, - password, - tableName, - schemaName, - dbzProperties, - startupOptions, - producedDataType, - metadataKeys); - } - - @Override - public String asSummaryString() { - return "Oracle-CDC"; - } - - @Override - public Map listReadableMetadata() { - return Stream.of(OracleReadableMetaData.values()) - .collect( - Collectors.toMap( - OracleReadableMetaData::getKey, - OracleReadableMetaData::getDataType)); - } - - @Override - public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { - this.metadataKeys = metadataKeys; - this.producedDataType = producedDataType; - } -} diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/table/OracleTableSourceFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/table/OracleTableSourceFactory.java deleted file mode 100644 index b1a47f611..000000000 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/table/OracleTableSourceFactory.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.oracle.source.table; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.factories.DynamicTableSourceFactory; -import org.apache.flink.table.factories.FactoryUtil; - -import com.ververica.cdc.connectors.oracle.table.StartupOptions; -import com.ververica.cdc.debezium.table.DebeziumOptions; - -import java.util.HashSet; -import java.util.Set; - -import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; - -/** Factory for creating configured instance of {@link OracleTableSource}. */ -public class OracleTableSourceFactory implements DynamicTableSourceFactory { - - private static final String IDENTIFIER = "oracle-cdc-new"; - - private static final ConfigOption HOSTNAME = - ConfigOptions.key("hostname") - .stringType() - .noDefaultValue() - .withDescription("IP address or hostname of the Oracle database server."); - - private static final ConfigOption PORT = - ConfigOptions.key("port") - .intType() - .defaultValue(1521) - .withDescription("Integer port number of the Oracle database server."); - - private static final ConfigOption USERNAME = - ConfigOptions.key("username") - .stringType() - .noDefaultValue() - .withDescription( - "Name of the Oracle database to use when connecting to the Oracle database server."); - - private static final ConfigOption PASSWORD = - ConfigOptions.key("password") - .stringType() - .noDefaultValue() - .withDescription( - "Password to use when connecting to the oracle database server."); - - private static final ConfigOption DATABASE_NAME = - ConfigOptions.key("database-name") - .stringType() - .noDefaultValue() - .withDescription("Database name of the Oracle server to monitor."); - - private static final ConfigOption SCHEMA_NAME = - ConfigOptions.key("schema-name") - .stringType() - .noDefaultValue() - .withDescription("Schema name of the Oracle database to monitor."); - - private static final ConfigOption TABLE_NAME = - ConfigOptions.key("table-name") - .stringType() - .noDefaultValue() - .withDescription("Table name of the Oracle database to monitor."); - - public static final ConfigOption SCAN_STARTUP_MODE = - ConfigOptions.key("scan.startup.mode") - .stringType() - .defaultValue("initial") - .withDescription( - "Optional startup mode for Oracle CDC consumer, valid enumerations are " - + "\"initial\", \"latest-offset\""); - - @Override - public DynamicTableSource createDynamicTableSource(Context context) { - final FactoryUtil.TableFactoryHelper helper = - FactoryUtil.createTableFactoryHelper(this, context); - helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX); - - final ReadableConfig config = helper.getOptions(); - String hostname = config.get(HOSTNAME); - String username = config.get(USERNAME); - String password = config.get(PASSWORD); - String databaseName = config.get(DATABASE_NAME); - String tableName = config.get(TABLE_NAME); - String schemaName = config.get(SCHEMA_NAME); - int port = config.get(PORT); - StartupOptions startupOptions = getStartupOptions(config); - ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); - - return new OracleTableSource( - physicalSchema, - port, - hostname, - databaseName, - tableName, - schemaName, - username, - password, - getDebeziumProperties(context.getCatalogTable().getOptions()), - startupOptions); - } - - @Override - public String factoryIdentifier() { - return IDENTIFIER; - } - - @Override - public Set> requiredOptions() { - Set> options = new HashSet<>(); - options.add(HOSTNAME); - options.add(USERNAME); - options.add(PASSWORD); - options.add(DATABASE_NAME); - options.add(TABLE_NAME); - options.add(SCHEMA_NAME); - return options; - } - - @Override - public Set> optionalOptions() { - Set> options = new HashSet<>(); - options.add(PORT); - options.add(SCAN_STARTUP_MODE); - - return options; - } - - private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; - private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; - - private static StartupOptions getStartupOptions(ReadableConfig config) { - String modeString = config.get(SCAN_STARTUP_MODE); - - switch (modeString.toLowerCase()) { - case SCAN_STARTUP_MODE_VALUE_INITIAL: - return StartupOptions.initial(); - - case SCAN_STARTUP_MODE_VALUE_LATEST: - return StartupOptions.latest(); - - default: - throw new ValidationException( - String.format( - "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s", - SCAN_STARTUP_MODE.key(), - SCAN_STARTUP_MODE_VALUE_INITIAL, - SCAN_STARTUP_MODE_VALUE_LATEST, - modeString)); - } - } -} diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/SerializerUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/SerializerUtils.java deleted file mode 100644 index d18e5f876..000000000 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/SerializerUtils.java +++ /dev/null @@ -1,125 +0,0 @@ -/// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// package com.ververica.cdc.connectors.oracle.source.utils; -// -// import org.apache.flink.core.memory.DataInputDeserializer; -// import org.apache.flink.core.memory.DataOutputSerializer; -// -// import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; -// import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetSerializer; -// import io.debezium.DebeziumException; -// import io.debezium.util.HexConverter; -// -// import java.io.ByteArrayInputStream; -// import java.io.ByteArrayOutputStream; -// import java.io.IOException; -// import java.io.ObjectInputStream; -// import java.io.ObjectOutputStream; -// -/// ** Utils for serialization and deserialization. */ -// public class SerializerUtils { -// -// private SerializerUtils() {} -// -// public static void writeRedoLogPosition(RedoLogOffset offset, DataOutputSerializer out) -// throws IOException { -// out.writeBoolean(offset != null); -// if (offset != null) { -// byte[] redoLogOffsetBytes = RedoLogOffsetSerializer.INSTANCE.serialize(offset); -// out.writeInt(redoLogOffsetBytes.length); -// out.write(redoLogOffsetBytes); -// } -// } -// -// public static RedoLogOffset readRedoLogPosition(int offsetVersion, DataInputDeserializer in) -// throws IOException { -// switch (offsetVersion) { -// case 1: -//// return in.readBoolean() ? new RedoLogOffset(in.readUTF(), in.readLong()) : null; -// case 2: -// case 3: -// case 4: -// return readRedoLogPosition(in); -// default: -// throw new IOException("Unknown version: " + offsetVersion); -// } -// } -// -// public static RedoLogOffset readRedoLogPosition(DataInputDeserializer in) throws IOException { -// boolean offsetNonNull = in.readBoolean(); -// if (offsetNonNull) { -// int redoLogOffsetBytesLength = in.readInt(); -// byte[] redoLogOffsetBytes = new byte[redoLogOffsetBytesLength]; -// in.readFully(redoLogOffsetBytes); -// return RedoLogOffsetSerializer.INSTANCE.deserialize(redoLogOffsetBytes); -// } else { -// return null; -// } -// } -// -// public static String rowToSerializedString(Object[] splitBoundary) { -// try (final ByteArrayOutputStream bos = new ByteArrayOutputStream(); -// ObjectOutputStream oos = new ObjectOutputStream(bos)) { -// oos.writeObject(splitBoundary); -// return HexConverter.convertToHexString(bos.toByteArray()); -// } catch (IOException e) { -// throw new DebeziumException( -// String.format("Cannot serialize split boundary information %s", -// splitBoundary)); -// } -// } -// -// public static String rowToSerializedString(Object splitBoundary) { -// try (final ByteArrayOutputStream bos = new ByteArrayOutputStream(); -// ObjectOutputStream oos = new ObjectOutputStream(bos)) { -// oos.writeObject(splitBoundary); -// return HexConverter.convertToHexString(bos.toByteArray()); -// } catch (IOException e) { -// throw new DebeziumException( -// String.format("Cannot serialize split boundary information %s", -// splitBoundary)); -// } -// } -// -// public static Object[] serializedStringToRow(String serialized) { -// try (final ByteArrayInputStream bis = -// new ByteArrayInputStream(HexConverter.convertFromHex(serialized)); -// ObjectInputStream ois = new ObjectInputStream(bis)) { -// return (Object[]) ois.readObject(); -// } catch (Exception e) { -// throw new DebeziumException( -// String.format( -// "Failed to deserialize split boundary with value '%s'", serialized), -// e); -// } -// } -// -// public static Object serializedStringToObject(String serialized) { -// try (final ByteArrayInputStream bis = -// new ByteArrayInputStream(HexConverter.convertFromHex(serialized)); -// ObjectInputStream ois = new ObjectInputStream(bis)) { -// return ois.readObject(); -// } catch (Exception e) { -// throw new DebeziumException( -// String.format( -// "Failed to deserialize split boundary with value '%s'", serialized), -// e); -// } -// } -// } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java index ee849be61..d6067f2eb 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java @@ -22,20 +22,21 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource; import com.ververica.cdc.connectors.oracle.OracleSource; +import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.table.MetadataConverter; import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; -import javax.annotation.Nullable; - import java.util.Collections; import java.util.List; import java.util.Map; @@ -53,8 +54,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata { private final ResolvedSchema physicalSchema; - private final String url; - private final Integer port; + private final int port; private final String hostname; private final String database; private final String username; @@ -63,6 +63,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada private final String schemaName; private final Properties dbzProperties; private final StartupOptions startupOptions; + private final boolean enableParallelRead; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -76,20 +77,19 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada public OracleTableSource( ResolvedSchema physicalSchema, - @Nullable String url, - @Nullable Integer port, - @Nullable String hostname, + int port, + String hostname, String database, String tableName, String schemaName, String username, String password, Properties dbzProperties, - StartupOptions startupOptions) { + StartupOptions startupOptions, + boolean enableParallelRead) { this.physicalSchema = physicalSchema; - this.url = url; this.port = port; - this.hostname = hostname; + this.hostname = checkNotNull(hostname); this.database = checkNotNull(database); this.tableName = checkNotNull(tableName); this.schemaName = checkNotNull(schemaName); @@ -99,6 +99,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada this.startupOptions = startupOptions; this.producedDataType = physicalSchema.toPhysicalRowDataType(); this.metadataKeys = Collections.emptyList(); + this.enableParallelRead = enableParallelRead; } @Override @@ -113,6 +114,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + RowType physicalDataType = (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); MetadataConverter[] metadataConverters = getMetadataConverters(); @@ -126,22 +128,39 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada .setUserDefinedConverterFactory( OracleDeserializationConverterFactory.instance()) .build(); - OracleSource.Builder builder = - OracleSource.builder() - .url(url) - .hostname(hostname) - .port(port) - .database(database) - .tableList(schemaName + "." + tableName) - .schemaList(schemaName) - .username(username) - .password(password) - .debeziumProperties(dbzProperties) - .startupOptions(startupOptions) - .deserializer(deserializer); - DebeziumSourceFunction sourceFunction = builder.build(); - - return SourceFunctionProvider.of(sourceFunction, false); + + if (enableParallelRead) { + JdbcIncrementalSource oracleChangeEventSource = + OracleSourceBuilder.OracleIncrementalSource.builder() + .hostname(hostname) + .port(port) + .databaseList(database) + .schemaList(schemaName) + .tableList(schemaName + "." + tableName) + .username(username) + .password(password) + .deserializer(deserializer) + .debeziumProperties(dbzProperties) + .build(); + + return SourceProvider.of(oracleChangeEventSource); + } else { + OracleSource.Builder builder = + OracleSource.builder() + .hostname(hostname) + .port(port) + .database(database) + .tableList(schemaName + "." + tableName) + .schemaList(schemaName) + .username(username) + .password(password) + .debeziumProperties(dbzProperties) + .startupOptions(startupOptions) + .deserializer(deserializer); + DebeziumSourceFunction sourceFunction = builder.build(); + + return SourceFunctionProvider.of(sourceFunction, false); + } } private MetadataConverter[] getMetadataConverters() { @@ -165,7 +184,6 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada OracleTableSource source = new OracleTableSource( physicalSchema, - url, port, hostname, database, @@ -174,7 +192,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada username, password, dbzProperties, - startupOptions); + startupOptions, + enableParallelRead); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -191,7 +210,6 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada OracleTableSource that = (OracleTableSource) o; return Objects.equals(port, that.port) && Objects.equals(physicalSchema, that.physicalSchema) - && Objects.equals(url, that.url) && Objects.equals(hostname, that.hostname) && Objects.equals(database, that.database) && Objects.equals(username, that.username) @@ -201,14 +219,14 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada && Objects.equals(dbzProperties, that.dbzProperties) && Objects.equals(startupOptions, that.startupOptions) && Objects.equals(producedDataType, that.producedDataType) - && Objects.equals(metadataKeys, that.metadataKeys); + && Objects.equals(metadataKeys, that.metadataKeys) + && Objects.equals(enableParallelRead, that.enableParallelRead); } @Override public int hashCode() { return Objects.hash( physicalSchema, - url, port, hostname, database, diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java index 7bdcc60f8..00cd89f91 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java @@ -17,7 +17,6 @@ package com.ververica.cdc.connectors.oracle.table; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedSchema; @@ -30,76 +29,27 @@ import com.ververica.cdc.debezium.table.DebeziumOptions; import java.util.HashSet; import java.util.Set; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.DATABASE_NAME; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PORT; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE; +import static com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions.SCHEMA_NAME; import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; -import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; -/** - * Factory for creating configured instance of {@link - * com.ververica.cdc.connectors.oracle.table.OracleTableSource}. - */ +/** Factory for creating configured instance of {@link OracleTableSource}. */ public class OracleTableSourceFactory implements DynamicTableSourceFactory { private static final String IDENTIFIER = "oracle-cdc"; - private static final ConfigOption HOSTNAME = - ConfigOptions.key("hostname") - .stringType() - .noDefaultValue() - .withDescription("IP address or hostname of the Oracle database server."); - - private static final ConfigOption PORT = - ConfigOptions.key("port") - .intType() - .defaultValue(1521) - .withDescription("Integer port number of the Oracle database server."); - - private static final ConfigOption URL = - ConfigOptions.key("url") - .stringType() - .noDefaultValue() - .withDescription( - "Complete JDBC URL as an alternative to specifying hostname, port and database provided as a way to support alternative connection scenarios."); - - private static final ConfigOption USERNAME = - ConfigOptions.key("username") - .stringType() - .noDefaultValue() - .withDescription( - "Name of the Oracle database to use when connecting to the Oracle database server."); - - private static final ConfigOption PASSWORD = - ConfigOptions.key("password") - .stringType() - .noDefaultValue() - .withDescription( - "Password to use when connecting to the oracle database server."); - - private static final ConfigOption DATABASE_NAME = - ConfigOptions.key("database-name") - .stringType() - .noDefaultValue() - .withDescription("Database name of the Oracle server to monitor."); - - private static final ConfigOption SCHEMA_NAME = - ConfigOptions.key("schema-name") - .stringType() - .noDefaultValue() - .withDescription("Schema name of the Oracle database to monitor."); - - private static final ConfigOption TABLE_NAME = - ConfigOptions.key("table-name") - .stringType() - .noDefaultValue() - .withDescription("Table name of the Oracle database to monitor."); - - public static final ConfigOption SCAN_STARTUP_MODE = - ConfigOptions.key("scan.startup.mode") - .stringType() - .defaultValue("initial") - .withDescription( - "Optional startup mode for Oracle CDC consumer, valid enumerations are " - + "\"initial\", \"latest-offset\""); - @Override public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = @@ -113,17 +63,25 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory { String databaseName = config.get(DATABASE_NAME); String tableName = config.get(TABLE_NAME); String schemaName = config.get(SCHEMA_NAME); - String url = config.get(URL); - Integer port = config.get(PORT); + int port = config.get(PORT); StartupOptions startupOptions = getStartupOptions(config); ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); - if (url == null) { - checkNotNull(hostname, "hostname is required when url is not configured"); - checkNotNull(port, "port is required when url is not configured"); + + boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); + int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); + int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); + int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); + int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); + + if (enableParallelRead) { + validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); + validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1); + validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1); + validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0); } + return new OracleTableSource( physicalSchema, - url, port, hostname, databaseName, @@ -132,7 +90,8 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory { username, password, getDebeziumProperties(context.getCatalogTable().getOptions()), - startupOptions); + startupOptions, + enableParallelRead); } @Override @@ -143,6 +102,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory { @Override public Set> requiredOptions() { Set> options = new HashSet<>(); + options.add(HOSTNAME); options.add(USERNAME); options.add(PASSWORD); options.add(DATABASE_NAME); @@ -154,11 +114,12 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory { @Override public Set> optionalOptions() { Set> options = new HashSet<>(); - options.add(HOSTNAME); options.add(PORT); - options.add(URL); options.add(SCAN_STARTUP_MODE); - + options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); + options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); + options.add(CONNECT_MAX_RETRIES); + options.add(CONNECTION_POOL_SIZE); return options; } @@ -185,4 +146,14 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory { modeString)); } } + + /** Checks the value of given integer option is valid. */ + private void validateIntegerOption( + ConfigOption option, int optionValue, int exclusiveMin) { + checkState( + optionValue > exclusiveMin, + String.format( + "The value of option '%s' must larger than %d, but is %d", + option.key(), exclusiveMin, optionValue)); + } } diff --git a/flink-connector-oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 1ec31a6a3..7418f73fa 100644 --- a/flink-connector-oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connector-oracle-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -12,4 +12,3 @@ # limitations under the License. com.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory -com.ververica.cdc.connectors.oracle.source.table.OracleTableSourceFactory diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java index fc45a261c..99acf4fd2 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java @@ -138,7 +138,7 @@ public class OracleSourceITCase extends OracleSourceTestBase { + " PHONE_NUMBER STRING," + " primary key (ID) not enforced" + ") WITH (" - + " 'connector' = 'oracle-cdc-new'," + + " 'connector' = 'oracle-cdc'," + " 'hostname' = '%s'," + " 'port' = '%s'," + " 'username' = '%s'," @@ -263,7 +263,7 @@ public class OracleSourceITCase extends OracleSourceTestBase { while (size > 0 && iter.hasNext()) { Row row = iter.next(); rows.add(row.toString()); - LOG.info("fetch row:{}", row.toString()); + LOG.info("fetch row:{}", row); size--; } return rows; diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java index b4ca6f95c..1478832b0 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java @@ -27,15 +27,10 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.SourceFunctionProvider; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.util.ExceptionUtils; -import com.ververica.cdc.debezium.DebeziumSourceFunction; import org.junit.Test; import java.util.ArrayList; @@ -45,7 +40,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -81,6 +75,7 @@ public class OracleTableSourceFactoryTest { Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); + private static final int MY_PORT = 1521; private static final String MY_LOCALHOST = "localhost"; private static final String MY_USERNAME = "flinkuser"; private static final String MY_PASSWORD = "flinkpw"; @@ -91,41 +86,23 @@ public class OracleTableSourceFactoryTest { @Test public void testRequiredProperties() { - try { - Map properties = getAllRequiredOptions(); - - // validation for source - createTableSource(properties); - fail("exception expected"); - } catch (Throwable t) { - assertTrue( - ExceptionUtils.findThrowableWithMessage( - t, "hostname is required when url is not configured") - .isPresent()); - } - } - - @Test - public void testRequiredPropertiesWithUrl() { - String url = "jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE; Map properties = getAllRequiredOptions(); - properties.put("url", url); // validation for source DynamicTableSource actualSource = createTableSource(properties); OracleTableSource expectedSource = new OracleTableSource( SCHEMA, - url, - 1521, - null, + MY_PORT, + MY_LOCALHOST, MY_DATABASE, MY_TABLE, MY_SCHEMA, MY_USERNAME, MY_PASSWORD, PROPERTIES, - StartupOptions.initial()); + StartupOptions.initial(), + true); assertEquals(expectedSource, actualSource); } @@ -138,8 +115,7 @@ public class OracleTableSourceFactoryTest { OracleTableSource expectedSource = new OracleTableSource( SCHEMA, - null, - 1521, + MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, @@ -147,7 +123,8 @@ public class OracleTableSourceFactoryTest { MY_USERNAME, MY_PASSWORD, PROPERTIES, - StartupOptions.initial()); + StartupOptions.initial(), + true); assertEquals(expectedSource, actualSource); } @@ -156,7 +133,6 @@ public class OracleTableSourceFactoryTest { Map options = getAllRequiredOptions(); options.put("port", "1521"); options.put("hostname", MY_LOCALHOST); - options.put("url", "jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE); options.put("debezium.snapshot.mode", "initial"); DynamicTableSource actualSource = createTableSource(options); @@ -165,8 +141,7 @@ public class OracleTableSourceFactoryTest { OracleTableSource expectedSource = new OracleTableSource( SCHEMA, - "jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE, - 1521, + MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, @@ -174,7 +149,8 @@ public class OracleTableSourceFactoryTest { MY_USERNAME, MY_PASSWORD, dbzProperties, - StartupOptions.initial()); + StartupOptions.initial(), + true); assertEquals(expectedSource, actualSource); } @@ -188,8 +164,7 @@ public class OracleTableSourceFactoryTest { OracleTableSource expectedSource = new OracleTableSource( SCHEMA, - null, - 1521, + MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, @@ -197,7 +172,8 @@ public class OracleTableSourceFactoryTest { MY_USERNAME, MY_PASSWORD, PROPERTIES, - StartupOptions.initial()); + StartupOptions.initial(), + true); assertEquals(expectedSource, actualSource); } @@ -211,8 +187,7 @@ public class OracleTableSourceFactoryTest { OracleTableSource expectedSource = new OracleTableSource( SCHEMA, - null, - 1521, + MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, @@ -220,13 +195,14 @@ public class OracleTableSourceFactoryTest { MY_USERNAME, MY_PASSWORD, PROPERTIES, - StartupOptions.latest()); + StartupOptions.latest(), + true); assertEquals(expectedSource, actualSource); } @Test public void testMetadataColumns() { - Map properties = getAllRequiredOptionsWithHost(); + Map properties = getAllRequiredOptions(); // validation for source DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); @@ -238,8 +214,7 @@ public class OracleTableSourceFactoryTest { OracleTableSource expectedSource = new OracleTableSource( SCHEMA_WITH_METADATA, - null, - 1521, + MY_PORT, MY_LOCALHOST, MY_DATABASE, MY_TABLE, @@ -247,19 +222,13 @@ public class OracleTableSourceFactoryTest { MY_USERNAME, MY_PASSWORD, new Properties(), - StartupOptions.initial()); + StartupOptions.initial(), + true); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name", "schema_name"); assertEquals(expectedSource, actualSource); - - ScanTableSource.ScanRuntimeProvider provider = - oracleTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); - DebeziumSourceFunction debeziumSourceFunction = - (DebeziumSourceFunction) - ((SourceFunctionProvider) provider).createSourceFunction(); - assertProducedTypeOfSourceFunction(debeziumSourceFunction, expectedSource.producedDataType); } @Test @@ -329,6 +298,8 @@ public class OracleTableSourceFactoryTest { private Map getAllRequiredOptions() { Map options = new HashMap<>(); options.put("connector", "oracle-cdc"); + options.put("port", "1521"); + options.put("hostname", MY_LOCALHOST); options.put("database-name", MY_DATABASE); options.put("table-name", MY_TABLE); options.put("username", MY_USERNAME);