[oracle] Introduce SCAN_INCREMENTAL_SNAPSHOT_ENABLED option for OracleTableSourceFactory

pull/1590/merge
molsion 3 years ago committed by Leonard Xu
parent a94eefd90a
commit 94fa8b347f

@ -91,109 +91,14 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
" 'connector' = 'oracle-cdc',",
" 'hostname' = '" + INTER_CONTAINER_ORACLE_ALIAS + "',",
" 'port' = '" + ORACLE_PORT + "',",
" 'username' = '" + ORACLE_TEST_USER + "',",
" 'password' = '" + ORACLE_TEST_PASSWORD + "',",
" 'database-name' = 'XE',",
" 'schema-name' = 'debezium',",
" 'debezium.log.mining.strategy' = 'online_catalog',",
" 'debezium.log.mining.continuous.mine' = 'true',",
" 'table-name' = 'products'",
");",
"CREATE TABLE products_sink (",
" `id` INT NOT NULL,",
" name STRING,",
" description STRING,",
" weight DECIMAL(10,3),",
" primary key (`id`) not enforced",
") WITH (",
" 'connector' = 'jdbc',",
String.format(
" 'url' = 'jdbc:mysql://%s:3306/%s',",
INTER_CONTAINER_MYSQL_ALIAS,
mysqlInventoryDatabase.getDatabaseName()),
" 'table-name' = 'products_sink',",
" 'username' = '" + MYSQL_TEST_USER + "',",
" 'password' = '" + MYSQL_TEST_PASSWORD + "'",
");",
"INSERT INTO products_sink",
"SELECT * FROM products_source;");
submitSQLJob(sqlLines, oracleCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
// generate binlogs
Class.forName(ORACLE_DRIVER_CLASS);
// we need to set this property, otherwise Azure Pipeline will complain
// "ORA-01882: timezone region not found" error when building the Oracle JDBC connection
// see https://stackoverflow.com/a/9177263/4915129
System.setProperty("oracle.jdbc.timezoneAsRegion", "false");
try (Connection conn = getOracleJdbcConnection();
Statement statement = conn.createStatement()) {
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
statement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
statement.execute(
"INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)");
statement.execute(
"INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
statement.execute(
"UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
statement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
statement.execute("DELETE FROM debezium.products WHERE ID=112");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
// assert final results
String mysqlUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
mysqlInventoryDatabase.getDatabaseName());
JdbcProxy proxy =
new JdbcProxy(mysqlUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS);
List<String> expectResult =
Arrays.asList(
"101,scooter,Small 2-wheel scooter,3.14",
"102,car battery,12V car battery,8.1",
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8",
"104,hammer,12oz carpenters hammer,0.75",
"105,hammer,14oz carpenters hammer,0.875",
"106,hammer,18oz carpenter hammer,1.0",
"107,rocks,box of assorted rocks,5.1",
"108,jacket,water resistent black wind breaker,0.1",
"109,spare tire,24 inch spare tire,22.2",
"111,jacket,new water resistent white wind breaker,0.5");
proxy.checkResultWithTimeout(
expectResult,
"products_sink",
new String[] {"id", "name", "description", "weight"},
150000L);
}
@Test
public void testOracleCDCNew() throws Exception {
List<String> sqlLines =
Arrays.asList(
"CREATE TABLE products_source (",
" ID INT NOT NULL,",
" NAME STRING,",
" DESCRIPTION STRING,",
" WEIGHT DECIMAL(10,3),",
" primary key (`ID`) not enforced",
") WITH (",
" 'connector' = 'oracle-cdc-new',",
" 'hostname' = '" + INTER_CONTAINER_ORACLE_ALIAS + "',",
" 'port' = '" + ORACLE_PORT + "',",
" 'username' = '" + ORACLE_SYSTEM_USER + "',",
" 'password' = '" + ORACLE_SYSTEM_PASSWORD + "',",
" 'database-name' = 'XE',",
" 'schema-name' = 'DEBEZIUM',",
" 'debezium.log.mining.strategy' = 'online_catalog',",
" 'debezium.log.mining.continuous.mine' = 'true',",
" 'table-name' = 'PRODUCTS'",
" 'table-name' = 'PRODUCTS',",
" 'scan.incremental.snapshot.chunk.size' = '4'",
");",
"CREATE TABLE products_sink (",
" `id` INT NOT NULL,",

@ -91,7 +91,6 @@ public class OracleDialect implements JdbcDataSourceDialect {
public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
return OracleConnectionUtils.createOracleConnection(
sourceConfig.getDbzConnectorConfig().getJdbcConfig());
// return JdbcDataSourceDialect.super.openJdbcConnection(sourceConfig);
}
@Override

@ -18,7 +18,7 @@
package com.ververica.cdc.connectors.oracle.source;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <p>Check the Java docs of each individual method to learn more about the settings to build a
* {@link OracleIncrementalSource}.
*/
@Experimental
@Internal
public class OracleSourceBuilder<T> {
private final OracleSourceConfigFactory configFactory = new OracleSourceConfigFactory();
private RedoLogOffsetFactory offsetFactory;
@ -219,5 +219,9 @@ public class OracleSourceBuilder<T> {
OracleDialect dataSourceDialect) {
super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect);
}
public static <T> OracleSourceBuilder<T> builder() {
return new OracleSourceBuilder<>();
}
}
}

@ -36,13 +36,9 @@ public class OracleSourceConfig extends JdbcSourceConfig {
private static final long serialVersionUID = 1L;
private List<String> schemaList;
private transient OracleConnectorConfig dbzOracleConfig;
public OracleSourceConfig(
StartupOptions startupOptions,
List<String> databaseList,
List<String> schemaList,
List<String> tableList,
int splitSize,
int splitMetaGroupSize,
@ -82,7 +78,6 @@ public class OracleSourceConfig extends JdbcSourceConfig {
connectTimeout,
connectMaxRetries,
connectionPoolSize);
this.schemaList = schemaList;
}
@Override

@ -88,7 +88,6 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory {
return new OracleSourceConfig(
startupOptions,
databaseList,
schemaList,
tableList,
splitSize,
splitMetaGroupSize,

@ -16,20 +16,20 @@
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.meta.events;
package com.ververica.cdc.connectors.oracle.source.config;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
import com.ververica.cdc.connectors.base.source.reader.JdbcIncrementalSourceReader;
import com.ververica.cdc.connectors.base.options.JdbcSourceOptions;
import com.ververica.cdc.connectors.oracle.OracleSource;
/**
* The {@link SourceEvent} that {@link JdbcIncrementalSourceReader} sends to {@link
* IncrementalSourceEnumerator} to ask the latest finished snapshot splits size.
*/
public class LatestFinishedSplitsSizeRequestEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
/** Configurations for {@link OracleSource}. */
public class OracleSourceOptions extends JdbcSourceOptions {
public LatestFinishedSplitsSizeRequestEvent() {}
public static final ConfigOption<String> SCHEMA_NAME =
ConfigOptions.key("schema-name")
.stringType()
.noDefaultValue()
.withDescription("Schema name of the Oracle database to monitor.");
}

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.meta.events;
import org.apache.flink.api.connector.source.SourceEvent;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
/**
* The {@link SourceEvent} that {@link IncrementalSourceEnumerator} sends to {@link
* JdbcIncrementalSource} to pass the latest finished snapshot splits size.
*/
public class LatestFinishedSplitsSizeEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
private final int latestFinishedSplitsSize;
public LatestFinishedSplitsSizeEvent(int latestFinishedSplitsSize) {
this.latestFinishedSplitsSize = latestFinishedSplitsSize;
}
public int getLatestFinishedSplitsSize() {
return latestFinishedSplitsSize;
}
@Override
public String toString() {
return "LatestFinishedSplitsSizeEvent{"
+ "latestFinishedSplitsSize="
+ latestFinishedSplitsSize
+ '}';
}
}

@ -1,151 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.meta.split;
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/** The split to describe the redo log of Oracle table(s). */
public class OracleRedoLogSplit extends OracleSplit {
private final RedoLogOffset startingOffset;
private final RedoLogOffset endingOffset;
private final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos;
private final Map<TableId, TableChanges.TableChange> tableSchemas;
private final int totalFinishedSplitSize;
private final boolean isSuspended;
@Nullable transient byte[] serializedFormCache;
public OracleRedoLogSplit(
String splitId,
RedoLogOffset startingOffset,
RedoLogOffset endingOffset,
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos,
Map<TableId, TableChanges.TableChange> tableSchemas,
int totalFinishedSplitSize,
boolean isSuspended) {
super(splitId);
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
this.tableSchemas = tableSchemas;
this.totalFinishedSplitSize = totalFinishedSplitSize;
this.isSuspended = isSuspended;
}
public OracleRedoLogSplit(
String splitId,
RedoLogOffset startingOffset,
RedoLogOffset endingOffset,
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos,
Map<TableId, TableChanges.TableChange> tableSchemas,
int totalFinishedSplitSize) {
super(splitId);
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
this.tableSchemas = tableSchemas;
this.totalFinishedSplitSize = totalFinishedSplitSize;
this.isSuspended = false;
}
public RedoLogOffset getStartingOffset() {
return startingOffset;
}
public RedoLogOffset getEndingOffset() {
return endingOffset;
}
public List<FinishedSnapshotSplitInfo> getFinishedSnapshotSplitInfos() {
return finishedSnapshotSplitInfos;
}
@Override
public Map<TableId, TableChanges.TableChange> getTableSchemas() {
return tableSchemas;
}
public int getTotalFinishedSplitSize() {
return totalFinishedSplitSize;
}
public boolean isSuspended() {
return isSuspended;
}
public boolean isCompletedSplit() {
return totalFinishedSplitSize == finishedSnapshotSplitInfos.size();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof OracleRedoLogSplit)) {
return false;
}
if (!super.equals(o)) {
return false;
}
OracleRedoLogSplit that = (OracleRedoLogSplit) o;
return totalFinishedSplitSize == that.totalFinishedSplitSize
&& isSuspended == that.isSuspended
&& Objects.equals(startingOffset, that.startingOffset)
&& Objects.equals(endingOffset, that.endingOffset)
&& Objects.equals(finishedSnapshotSplitInfos, that.finishedSnapshotSplitInfos)
&& Objects.equals(tableSchemas, that.tableSchemas);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
startingOffset,
endingOffset,
finishedSnapshotSplitInfos,
tableSchemas,
totalFinishedSplitSize,
isSuspended);
}
@Override
public String toString() {
return "OracleRedoLogSplit{"
+ "splitId='"
+ splitId
+ '\''
+ ", offset="
+ startingOffset
+ ", endOffset="
+ endingOffset
+ ", isSuspended="
+ isSuspended
+ '}';
}
}

@ -1,147 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.meta.split;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/** The split to describe a split of an Oracle table snapshot. */
public class OracleSnapshotSplit extends OracleSplit {
private final TableId tableId;
private final RowType splitKeyType;
private final Map<TableId, TableChanges.TableChange> tableSchemas;
@Nullable private final Object[] splitStart;
@Nullable private final Object[] splitEnd;
/** The high watermark is not bull when the split read finished. */
@Nullable private final RedoLogOffset highWatermark;
@Nullable transient byte[] serializedFormCache;
public OracleSnapshotSplit(
String splitId,
TableId tableId,
RowType splitKeyType,
Map<TableId, TableChanges.TableChange> tableSchemas,
@Nullable Object[] splitStart,
@Nullable Object[] splitEnd,
@Nullable RedoLogOffset highWatermark) {
super(splitId);
this.tableId = tableId;
this.splitKeyType = splitKeyType;
this.tableSchemas = tableSchemas;
this.splitStart = splitStart;
this.splitEnd = splitEnd;
this.highWatermark = highWatermark;
}
@Override
public Map<TableId, TableChanges.TableChange> getTableSchemas() {
return tableSchemas;
}
public TableId getTableId() {
return tableId;
}
@Nullable
public Object[] getSplitStart() {
return splitStart;
}
@Nullable
public Object[] getSplitEnd() {
return splitEnd;
}
@Nullable
public RedoLogOffset getHighWatermark() {
return highWatermark;
}
public boolean isSnapshotReadFinished() {
return highWatermark != null;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
OracleSnapshotSplit that = (OracleSnapshotSplit) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(splitKeyType, that.splitKeyType)
&& Arrays.equals(splitStart, that.splitStart)
&& Arrays.equals(splitEnd, that.splitEnd)
&& Objects.equals(highWatermark, that.highWatermark);
}
public RowType getSplitKeyType() {
return splitKeyType;
}
@Override
public int hashCode() {
int result = Objects.hash(super.hashCode(), tableId, splitKeyType, highWatermark);
result = 31 * result + Arrays.hashCode(splitStart);
result = 31 * result + Arrays.hashCode(splitEnd);
result = 31 * result + Arrays.hashCode(serializedFormCache);
return result;
}
@Override
public String toString() {
String splitKeyTypeSummary =
splitKeyType.getFields().stream()
.map(RowType.RowField::asSummaryString)
.collect(Collectors.joining(",", "[", "]"));
return "OracleSnapshotSplit{"
+ "tableId="
+ tableId
+ ", splitId='"
+ splitId
+ '\''
+ ", splitKeyType="
+ splitKeyTypeSummary
+ ", splitStart="
+ Arrays.toString(splitStart)
+ ", splitEnd="
+ Arrays.toString(splitEnd)
+ ", highWatermark="
+ highWatermark
+ '}';
}
}

@ -1,198 +0,0 @@
/// *
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
// package com.ververica.cdc.connectors.oracle.source.meta.split;
//
// import org.apache.flink.core.memory.DataInputDeserializer;
// import org.apache.flink.core.memory.DataOutputSerializer;
// import org.apache.flink.table.types.logical.RowType;
// import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
//
// import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
// import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
// import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
// import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
// import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
// import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
// import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
// import com.ververica.cdc.connectors.base.utils.SerializerUtils;
// import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
// import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
// import io.debezium.document.Document;
// import io.debezium.document.DocumentReader;
// import io.debezium.relational.TableId;
// import io.debezium.relational.history.TableChanges;
//
// import java.io.IOException;
// import java.nio.charset.StandardCharsets;
// import java.util.ArrayList;
// import java.util.HashMap;
// import java.util.List;
// import java.util.Map;
//
/// ** An Oracle serializer for the {@link SourceSplitBase}. */
// public class OracleSourceSplitSerializer extends SourceSplitSerializer {
//
// private static final int SNAPSHOT_SPLIT_FLAG = 1;
// private static final int REDOLOG_SPLIT_FLAG = 2;
// RedoLogOffsetFactory offsetFactory;
//
// public OracleSourceSplitSerializer(RedoLogOffsetFactory offsetFactory) {
// this.offsetFactory = offsetFactory;
// }
//
// @Override
// public OffsetFactory getOffsetFactory() {
// return offsetFactory;
// }
//
// @Override
// public Offset readOffsetPosition(int offsetVersion, DataInputDeserializer in)
// throws IOException {
// return super.readOffsetPosition(offsetVersion, in);
// }
//
// @Override
// public Offset readOffsetPosition(DataInputDeserializer in) throws IOException {
// return super.readOffsetPosition(in);
// }
//
// @Override
// public void writeOffsetPosition(Offset offset, DataOutputSerializer out) throws IOException {
// super.writeOffsetPosition(offset, out);
// }
//
// @Override
// public OffsetDeserializer createOffsetDeserializer() {
// return super.createOffsetDeserializer();
// }
//
// @Override
// public FinishedSnapshotSplitInfo deserialize(byte[] serialized) {
// return super.deserialize(serialized);
// }
//
// @Override
// public byte[] serialize(SourceSplitBase split) throws IOException {
// return super.serialize(split);
// }
//
// @Override
// public SourceSplitBase deserialize(int version, byte[] serialized) throws IOException {
// return super.deserialize(version, serialized);
// }
//
// @Override
// public SourceSplitBase deserializeSplit(int version, byte[] serialized) throws IOException {
// final DataInputDeserializer in = new DataInputDeserializer(serialized);
//
// int splitKind = in.readInt();
// if (splitKind == SNAPSHOT_SPLIT_FLAG) {
// TableId tableId = TableId.parse(in.readUTF(), false);
// String splitId = in.readUTF();
// RowType splitKeyType = (RowType) LogicalTypeParser.parse(in.readUTF());
// Object[] splitBoundaryStart = SerializerUtils.serializedStringToRow(in.readUTF());
// Object[] splitBoundaryEnd = SerializerUtils.serializedStringToRow(in.readUTF());
// Offset highWatermark = readOffsetPosition(version, in);
// Map<TableId, TableChanges.TableChange> tableSchemas = readTableSchemas(version, in);
//
// return new SnapshotSplit(
// tableId,
// splitId,
// splitKeyType,
// splitBoundaryStart,
// splitBoundaryEnd,
// highWatermark,
// tableSchemas);
// } else if (splitKind == REDOLOG_SPLIT_FLAG) {
// String splitId = in.readUTF();
// // skip split Key Type
// in.readUTF();
// Offset startingOffset = readOffsetPosition(version, in);
// Offset endingOffset = readOffsetPosition(version, in);
// List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
// readFinishedSplitsInfo(version, in);
// Map<TableId, TableChanges.TableChange> tableChangeMap = readTableSchemas(version, in);
// int totalFinishedSplitSize = finishedSplitsInfo.size();
// if (version == 3) {
// totalFinishedSplitSize = in.readInt();
// }
// in.releaseArrays();
// return new StreamSplit(
// splitId,
// startingOffset,
// endingOffset,
// finishedSplitsInfo,
// tableChangeMap,
// totalFinishedSplitSize);
// } else {
// throw new IOException("Unknown split kind: " + splitKind);
// }
// }
//
// private List<FinishedSnapshotSplitInfo> readFinishedSplitsInfo(
// int version, DataInputDeserializer in) throws IOException {
// List<FinishedSnapshotSplitInfo> finishedSplitsInfo = new ArrayList<>();
// final int size = in.readInt();
// for (int i = 0; i < size; i++) {
// TableId tableId = TableId.parse(in.readUTF(), false);
// String splitId = in.readUTF();
// Object[] splitStart = SerializerUtils.serializedStringToRow(in.readUTF());
// Object[] splitEnd = SerializerUtils.serializedStringToRow(in.readUTF());
// OffsetFactory offsetFactory =
// (OffsetFactory) SerializerUtils.serializedStringToObject(in.readUTF());
// Offset highWatermark = readOffsetPosition(version, in);
//
// finishedSplitsInfo.add(
// new FinishedSnapshotSplitInfo(
// tableId, splitId, splitStart, splitEnd, highWatermark,
// offsetFactory));
// }
// return finishedSplitsInfo;
// }
//
// private static Map<TableId, TableChanges.TableChange> readTableSchemas(
// int version, DataInputDeserializer in) throws IOException {
// DocumentReader documentReader = DocumentReader.defaultReader();
// Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
// final int size = in.readInt();
// for (int i = 0; i < size; i++) {
// TableId tableId = TableId.parse(in.readUTF(), false);
// final String tableChangeStr;
// switch (version) {
// case 1:
// tableChangeStr = in.readUTF();
// break;
// case 2:
// case 3:
// final int len = in.readInt();
// final byte[] bytes = new byte[len];
// in.read(bytes);
// tableChangeStr = new String(bytes, StandardCharsets.UTF_8);
// break;
// default:
// throw new IOException("Unknown version: " + version);
// }
// Document document = documentReader.read(tableChangeStr);
// TableChanges.TableChange tableChange =
// FlinkJsonTableChangeSerializer.fromDocument(document, false);
// tableSchemas.put(tableId, tableChange);
// }
// return tableSchemas;
// }
// }

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.meta.split;
import org.apache.flink.api.connector.source.SourceSplit;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.Map;
import java.util.Objects;
/** The split of table comes from a Table that splits by primary key. */
public abstract class OracleSplit implements SourceSplit {
protected final String splitId;
public OracleSplit(String splitId) {
this.splitId = splitId;
}
@Override
public String splitId() {
return splitId;
}
/** Checks whether this split is a snapshot split. */
public final boolean isSnapshotSplit() {
return getClass() == OracleSnapshotSplit.class;
}
/** Checks whether this split is a redo log split. */
public final boolean isBinlogSplit() {
return getClass() == OracleRedoLogSplit.class;
}
/** Casts this split into a {@link OracleSnapshotSplit}. */
public final OracleSnapshotSplit asSnapshotSplit() {
return (OracleSnapshotSplit) this;
}
/** Casts this split into a {@link OracleRedoLogSplit}. */
public final OracleRedoLogSplit asRedoLogSplit() {
return (OracleRedoLogSplit) this;
}
public abstract Map<TableId, TableChanges.TableChange> getTableSchemas();
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OracleSplit that = (OracleSplit) o;
return Objects.equals(splitId, that.splitId);
}
@Override
public int hashCode() {
return Objects.hash(splitId);
}
}

@ -193,7 +193,7 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
sourcePartition,
backFillBinlogSplit,
backFillBinlogSplit.getEndingOffset(),
WatermarkKind.BINLOG_END);
WatermarkKind.END);
}
/** A wrapped task to fetch snapshot split of table. */

@ -143,7 +143,7 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
offsetContext.getPartition(),
redoLogSplit,
currentRedoLogOffset,
JdbcSourceEventDispatcher.WatermarkKind.BINLOG_END);
JdbcSourceEventDispatcher.WatermarkKind.END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(

@ -1,241 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.oracle.table.OracleDeserializationConverterFactory;
import com.ververica.cdc.connectors.oracle.table.OracleReadableMetaData;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link DynamicTableSource} that describes how to create a Oracle binlog from a logical
* description.
*/
public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {
private final ResolvedSchema physicalSchema;
private final int port;
private final String hostname;
private final String database;
private final String username;
private final String password;
private final String tableName;
private final String schemaName;
private final Properties dbzProperties;
private final StartupOptions startupOptions;
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
/** Data type that describes the final output of the source. */
protected DataType producedDataType;
/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
public OracleTableSource(
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
String tableName,
String schemaName,
String username,
String password,
Properties dbzProperties,
StartupOptions startupOptions) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
this.database = checkNotNull(database);
this.tableName = checkNotNull(tableName);
this.schemaName = checkNotNull(schemaName);
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.dbzProperties = dbzProperties;
this.startupOptions = startupOptions;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(physicalDataType)
.setMetadataConverters(metadataConverters)
.setResultTypeInfo(typeInfo)
.setUserDefinedConverterFactory(
OracleDeserializationConverterFactory.instance())
.build();
JdbcIncrementalSource<RowData> oracleChangeEventSource =
new <RowData>OracleSourceBuilder()
.hostname(hostname)
.port(port)
.databaseList(database)
.schemaList(schemaName)
.tableList(schemaName + "." + tableName)
.username(username)
.password(password)
.deserializer(deserializer)
.includeSchemaChanges(true)
.debeziumProperties(dbzProperties)
.build();
return SourceProvider.of(oracleChangeEventSource);
}
private MetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new MetadataConverter[0];
}
return metadataKeys.stream()
.map(
key ->
Stream.of(OracleReadableMetaData.values())
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(OracleReadableMetaData::getConverter)
.toArray(MetadataConverter[]::new);
}
@Override
public DynamicTableSource copy() {
OracleTableSource source =
new OracleTableSource(
physicalSchema,
port,
hostname,
database,
tableName,
schemaName,
username,
password,
dbzProperties,
startupOptions);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OracleTableSource that = (OracleTableSource) o;
return port == that.port
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(schemaName, that.schemaName)
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys);
}
@Override
public int hashCode() {
return Objects.hash(
physicalSchema,
port,
hostname,
database,
username,
password,
tableName,
schemaName,
dbzProperties,
startupOptions,
producedDataType,
metadataKeys);
}
@Override
public String asSummaryString() {
return "Oracle-CDC";
}
@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(OracleReadableMetaData.values())
.collect(
Collectors.toMap(
OracleReadableMetaData::getKey,
OracleReadableMetaData::getDataType));
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
}

@ -1,174 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import java.util.HashSet;
import java.util.Set;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
/** Factory for creating configured instance of {@link OracleTableSource}. */
public class OracleTableSourceFactory implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "oracle-cdc-new";
private static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
.withDescription("IP address or hostname of the Oracle database server.");
private static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(1521)
.withDescription("Integer port number of the Oracle database server.");
private static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription(
"Name of the Oracle database to use when connecting to the Oracle database server.");
private static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription(
"Password to use when connecting to the oracle database server.");
private static final ConfigOption<String> DATABASE_NAME =
ConfigOptions.key("database-name")
.stringType()
.noDefaultValue()
.withDescription("Database name of the Oracle server to monitor.");
private static final ConfigOption<String> SCHEMA_NAME =
ConfigOptions.key("schema-name")
.stringType()
.noDefaultValue()
.withDescription("Schema name of the Oracle database to monitor.");
private static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("Table name of the Oracle database to monitor.");
public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.stringType()
.defaultValue("initial")
.withDescription(
"Optional startup mode for Oracle CDC consumer, valid enumerations are "
+ "\"initial\", \"latest-offset\"");
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
String schemaName = config.get(SCHEMA_NAME);
int port = config.get(PORT);
StartupOptions startupOptions = getStartupOptions(config);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
return new OracleTableSource(
physicalSchema,
port,
hostname,
databaseName,
tableName,
schemaName,
username,
password,
getDebeziumProperties(context.getCatalogTable().getOptions()),
startupOptions);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(USERNAME);
options.add(PASSWORD);
options.add(DATABASE_NAME);
options.add(TABLE_NAME);
options.add(SCHEMA_NAME);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
options.add(SCAN_STARTUP_MODE);
return options;
}
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static StartupOptions getStartupOptions(ReadableConfig config) {
String modeString = config.get(SCAN_STARTUP_MODE);
switch (modeString.toLowerCase()) {
case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
default:
throw new ValidationException(
String.format(
"Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_LATEST,
modeString));
}
}
}

@ -1,125 +0,0 @@
/// *
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
// package com.ververica.cdc.connectors.oracle.source.utils;
//
// import org.apache.flink.core.memory.DataInputDeserializer;
// import org.apache.flink.core.memory.DataOutputSerializer;
//
// import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
// import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetSerializer;
// import io.debezium.DebeziumException;
// import io.debezium.util.HexConverter;
//
// import java.io.ByteArrayInputStream;
// import java.io.ByteArrayOutputStream;
// import java.io.IOException;
// import java.io.ObjectInputStream;
// import java.io.ObjectOutputStream;
//
/// ** Utils for serialization and deserialization. */
// public class SerializerUtils {
//
// private SerializerUtils() {}
//
// public static void writeRedoLogPosition(RedoLogOffset offset, DataOutputSerializer out)
// throws IOException {
// out.writeBoolean(offset != null);
// if (offset != null) {
// byte[] redoLogOffsetBytes = RedoLogOffsetSerializer.INSTANCE.serialize(offset);
// out.writeInt(redoLogOffsetBytes.length);
// out.write(redoLogOffsetBytes);
// }
// }
//
// public static RedoLogOffset readRedoLogPosition(int offsetVersion, DataInputDeserializer in)
// throws IOException {
// switch (offsetVersion) {
// case 1:
//// return in.readBoolean() ? new RedoLogOffset(in.readUTF(), in.readLong()) : null;
// case 2:
// case 3:
// case 4:
// return readRedoLogPosition(in);
// default:
// throw new IOException("Unknown version: " + offsetVersion);
// }
// }
//
// public static RedoLogOffset readRedoLogPosition(DataInputDeserializer in) throws IOException {
// boolean offsetNonNull = in.readBoolean();
// if (offsetNonNull) {
// int redoLogOffsetBytesLength = in.readInt();
// byte[] redoLogOffsetBytes = new byte[redoLogOffsetBytesLength];
// in.readFully(redoLogOffsetBytes);
// return RedoLogOffsetSerializer.INSTANCE.deserialize(redoLogOffsetBytes);
// } else {
// return null;
// }
// }
//
// public static String rowToSerializedString(Object[] splitBoundary) {
// try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
// ObjectOutputStream oos = new ObjectOutputStream(bos)) {
// oos.writeObject(splitBoundary);
// return HexConverter.convertToHexString(bos.toByteArray());
// } catch (IOException e) {
// throw new DebeziumException(
// String.format("Cannot serialize split boundary information %s",
// splitBoundary));
// }
// }
//
// public static String rowToSerializedString(Object splitBoundary) {
// try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
// ObjectOutputStream oos = new ObjectOutputStream(bos)) {
// oos.writeObject(splitBoundary);
// return HexConverter.convertToHexString(bos.toByteArray());
// } catch (IOException e) {
// throw new DebeziumException(
// String.format("Cannot serialize split boundary information %s",
// splitBoundary));
// }
// }
//
// public static Object[] serializedStringToRow(String serialized) {
// try (final ByteArrayInputStream bis =
// new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
// ObjectInputStream ois = new ObjectInputStream(bis)) {
// return (Object[]) ois.readObject();
// } catch (Exception e) {
// throw new DebeziumException(
// String.format(
// "Failed to deserialize split boundary with value '%s'", serialized),
// e);
// }
// }
//
// public static Object serializedStringToObject(String serialized) {
// try (final ByteArrayInputStream bis =
// new ByteArrayInputStream(HexConverter.convertFromHex(serialized));
// ObjectInputStream ois = new ObjectInputStream(bis)) {
// return ois.readObject();
// } catch (Exception e) {
// throw new DebeziumException(
// String.format(
// "Failed to deserialize split boundary with value '%s'", serialized),
// e);
// }
// }
// }

@ -22,20 +22,21 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -53,8 +54,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {
private final ResolvedSchema physicalSchema;
private final String url;
private final Integer port;
private final int port;
private final String hostname;
private final String database;
private final String username;
@ -63,6 +63,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
private final String schemaName;
private final Properties dbzProperties;
private final StartupOptions startupOptions;
private final boolean enableParallelRead;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -76,20 +77,19 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
public OracleTableSource(
ResolvedSchema physicalSchema,
@Nullable String url,
@Nullable Integer port,
@Nullable String hostname,
int port,
String hostname,
String database,
String tableName,
String schemaName,
String username,
String password,
Properties dbzProperties,
StartupOptions startupOptions) {
StartupOptions startupOptions,
boolean enableParallelRead) {
this.physicalSchema = physicalSchema;
this.url = url;
this.port = port;
this.hostname = hostname;
this.hostname = checkNotNull(hostname);
this.database = checkNotNull(database);
this.tableName = checkNotNull(tableName);
this.schemaName = checkNotNull(schemaName);
@ -99,6 +99,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
this.startupOptions = startupOptions;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
this.enableParallelRead = enableParallelRead;
}
@Override
@ -113,6 +114,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
@ -126,22 +128,39 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
.setUserDefinedConverterFactory(
OracleDeserializationConverterFactory.instance())
.build();
OracleSource.Builder<RowData> builder =
OracleSource.<RowData>builder()
.url(url)
.hostname(hostname)
.port(port)
.database(database)
.tableList(schemaName + "." + tableName)
.schemaList(schemaName)
.username(username)
.password(password)
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);
if (enableParallelRead) {
JdbcIncrementalSource<RowData> oracleChangeEventSource =
OracleSourceBuilder.OracleIncrementalSource.<RowData>builder()
.hostname(hostname)
.port(port)
.databaseList(database)
.schemaList(schemaName)
.tableList(schemaName + "." + tableName)
.username(username)
.password(password)
.deserializer(deserializer)
.debeziumProperties(dbzProperties)
.build();
return SourceProvider.of(oracleChangeEventSource);
} else {
OracleSource.Builder<RowData> builder =
OracleSource.<RowData>builder()
.hostname(hostname)
.port(port)
.database(database)
.tableList(schemaName + "." + tableName)
.schemaList(schemaName)
.username(username)
.password(password)
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
}
private MetadataConverter[] getMetadataConverters() {
@ -165,7 +184,6 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
OracleTableSource source =
new OracleTableSource(
physicalSchema,
url,
port,
hostname,
database,
@ -174,7 +192,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
username,
password,
dbzProperties,
startupOptions);
startupOptions,
enableParallelRead);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@ -191,7 +210,6 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
OracleTableSource that = (OracleTableSource) o;
return Objects.equals(port, that.port)
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(url, that.url)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
&& Objects.equals(username, that.username)
@ -201,14 +219,14 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys);
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(enableParallelRead, that.enableParallelRead);
}
@Override
public int hashCode() {
return Objects.hash(
physicalSchema,
url,
port,
hostname,
database,

@ -17,7 +17,6 @@
package com.ververica.cdc.connectors.oracle.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
@ -30,76 +29,27 @@ import com.ververica.cdc.debezium.table.DebeziumOptions;
import java.util.HashSet;
import java.util.Set;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.DATABASE_NAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PORT;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
import static com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions.SCHEMA_NAME;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Factory for creating configured instance of {@link
* com.ververica.cdc.connectors.oracle.table.OracleTableSource}.
*/
/** Factory for creating configured instance of {@link OracleTableSource}. */
public class OracleTableSourceFactory implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "oracle-cdc";
private static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
.withDescription("IP address or hostname of the Oracle database server.");
private static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(1521)
.withDescription("Integer port number of the Oracle database server.");
private static final ConfigOption<String> URL =
ConfigOptions.key("url")
.stringType()
.noDefaultValue()
.withDescription(
"Complete JDBC URL as an alternative to specifying hostname, port and database provided as a way to support alternative connection scenarios.");
private static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription(
"Name of the Oracle database to use when connecting to the Oracle database server.");
private static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription(
"Password to use when connecting to the oracle database server.");
private static final ConfigOption<String> DATABASE_NAME =
ConfigOptions.key("database-name")
.stringType()
.noDefaultValue()
.withDescription("Database name of the Oracle server to monitor.");
private static final ConfigOption<String> SCHEMA_NAME =
ConfigOptions.key("schema-name")
.stringType()
.noDefaultValue()
.withDescription("Schema name of the Oracle database to monitor.");
private static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("Table name of the Oracle database to monitor.");
public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.stringType()
.defaultValue("initial")
.withDescription(
"Optional startup mode for Oracle CDC consumer, valid enumerations are "
+ "\"initial\", \"latest-offset\"");
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
@ -113,17 +63,25 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
String schemaName = config.get(SCHEMA_NAME);
String url = config.get(URL);
Integer port = config.get(PORT);
int port = config.get(PORT);
StartupOptions startupOptions = getStartupOptions(config);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
if (url == null) {
checkNotNull(hostname, "hostname is required when url is not configured");
checkNotNull(port, "port is required when url is not configured");
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1);
validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0);
}
return new OracleTableSource(
physicalSchema,
url,
port,
hostname,
databaseName,
@ -132,7 +90,8 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
username,
password,
getDebeziumProperties(context.getCatalogTable().getOptions()),
startupOptions);
startupOptions,
enableParallelRead);
}
@Override
@ -143,6 +102,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(USERNAME);
options.add(PASSWORD);
options.add(DATABASE_NAME);
@ -154,11 +114,12 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(PORT);
options.add(URL);
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
options.add(CONNECT_MAX_RETRIES);
options.add(CONNECTION_POOL_SIZE);
return options;
}
@ -185,4 +146,14 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
modeString));
}
}
/** Checks the value of given integer option is valid. */
private void validateIntegerOption(
ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
checkState(
optionValue > exclusiveMin,
String.format(
"The value of option '%s' must larger than %d, but is %d",
option.key(), exclusiveMin, optionValue));
}
}

@ -12,4 +12,3 @@
# limitations under the License.
com.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory
com.ververica.cdc.connectors.oracle.source.table.OracleTableSourceFactory

@ -138,7 +138,7 @@ public class OracleSourceITCase extends OracleSourceTestBase {
+ " PHONE_NUMBER STRING,"
+ " primary key (ID) not enforced"
+ ") WITH ("
+ " 'connector' = 'oracle-cdc-new',"
+ " 'connector' = 'oracle-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
@ -263,7 +263,7 @@ public class OracleSourceITCase extends OracleSourceTestBase {
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
LOG.info("fetch row:{}", row.toString());
LOG.info("fetch row:{}", row);
size--;
}
return rows;

@ -27,15 +27,10 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.junit.Test;
import java.util.ArrayList;
@ -45,7 +40,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -81,6 +75,7 @@ public class OracleTableSourceFactoryTest {
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
private static final int MY_PORT = 1521;
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
@ -91,41 +86,23 @@ public class OracleTableSourceFactoryTest {
@Test
public void testRequiredProperties() {
try {
Map<String, String> properties = getAllRequiredOptions();
// validation for source
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t, "hostname is required when url is not configured")
.isPresent());
}
}
@Test
public void testRequiredPropertiesWithUrl() {
String url = "jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE;
Map<String, String> properties = getAllRequiredOptions();
properties.put("url", url);
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
url,
1521,
null,
MY_PORT,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_SCHEMA,
MY_USERNAME,
MY_PASSWORD,
PROPERTIES,
StartupOptions.initial());
StartupOptions.initial(),
true);
assertEquals(expectedSource, actualSource);
}
@ -138,8 +115,7 @@ public class OracleTableSourceFactoryTest {
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
null,
1521,
MY_PORT,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
@ -147,7 +123,8 @@ public class OracleTableSourceFactoryTest {
MY_USERNAME,
MY_PASSWORD,
PROPERTIES,
StartupOptions.initial());
StartupOptions.initial(),
true);
assertEquals(expectedSource, actualSource);
}
@ -156,7 +133,6 @@ public class OracleTableSourceFactoryTest {
Map<String, String> options = getAllRequiredOptions();
options.put("port", "1521");
options.put("hostname", MY_LOCALHOST);
options.put("url", "jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE);
options.put("debezium.snapshot.mode", "initial");
DynamicTableSource actualSource = createTableSource(options);
@ -165,8 +141,7 @@ public class OracleTableSourceFactoryTest {
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
"jdbc:oracle:thin:@" + MY_LOCALHOST + ":1521" + ":" + MY_DATABASE,
1521,
MY_PORT,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
@ -174,7 +149,8 @@ public class OracleTableSourceFactoryTest {
MY_USERNAME,
MY_PASSWORD,
dbzProperties,
StartupOptions.initial());
StartupOptions.initial(),
true);
assertEquals(expectedSource, actualSource);
}
@ -188,8 +164,7 @@ public class OracleTableSourceFactoryTest {
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
null,
1521,
MY_PORT,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
@ -197,7 +172,8 @@ public class OracleTableSourceFactoryTest {
MY_USERNAME,
MY_PASSWORD,
PROPERTIES,
StartupOptions.initial());
StartupOptions.initial(),
true);
assertEquals(expectedSource, actualSource);
}
@ -211,8 +187,7 @@ public class OracleTableSourceFactoryTest {
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA,
null,
1521,
MY_PORT,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
@ -220,13 +195,14 @@ public class OracleTableSourceFactoryTest {
MY_USERNAME,
MY_PASSWORD,
PROPERTIES,
StartupOptions.latest());
StartupOptions.latest(),
true);
assertEquals(expectedSource, actualSource);
}
@Test
public void testMetadataColumns() {
Map<String, String> properties = getAllRequiredOptionsWithHost();
Map<String, String> properties = getAllRequiredOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
@ -238,8 +214,7 @@ public class OracleTableSourceFactoryTest {
OracleTableSource expectedSource =
new OracleTableSource(
SCHEMA_WITH_METADATA,
null,
1521,
MY_PORT,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
@ -247,19 +222,13 @@ public class OracleTableSourceFactoryTest {
MY_USERNAME,
MY_PASSWORD,
new Properties(),
StartupOptions.initial());
StartupOptions.initial(),
true);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "table_name", "schema_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
oracleTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
DebeziumSourceFunction<RowData> debeziumSourceFunction =
(DebeziumSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(debeziumSourceFunction, expectedSource.producedDataType);
}
@Test
@ -329,6 +298,8 @@ public class OracleTableSourceFactoryTest {
private Map<String, String> getAllRequiredOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "oracle-cdc");
options.put("port", "1521");
options.put("hostname", MY_LOCALHOST);
options.put("database-name", MY_DATABASE);
options.put("table-name", MY_TABLE);
options.put("username", MY_USERNAME);

Loading…
Cancel
Save